Blob Blame History Raw
From 4c91bc3bf8c6005db5795fe51632c1feedc4719e Mon Sep 17 00:00:00 2001
From: Matteo Croce <mcroce@redhat.com>
Date: Tue, 18 Sep 2018 14:56:37 +0200
Subject: [PATCH v2] dpif-netlink: don't allocate per thread netlink sockets

When using the kernel datapath, OVS allocates a pool of sockets to handle
netlink events. The number of sockets is: ports * n-handler-threads, where
n-handler-threads is user configurable and defaults to 3/4*number of cores.

This because vswitchd starts n-handler-threads threads, each one with a
netlink socket for every port of the switch. Every thread then, starts
listening on events on its set of sockets with epoll().

On setup with lot of CPUs and ports, the number of sockets easily hits
the process file descriptor limit, and ovs-vswitchd will exit with -EMFILE.

Change the number of allocated sockets to just one per port by moving
the socket array from a per handler structure to a per datapath one,
and let all the handlers share the same sockets by using EPOLLEXCLUSIVE
epoll flag which avoids duplicate events, on systems that support it.

The patch was tested on a 56 core machine running Linux 4.18 and latest
Open vSwitch. A bridge was created with 2000+ ports, some of them being
veth interfaces with the peer outside the bridge. The latency of the upcall
is measured by setting a single 'action=controller,local' OpenFlow rule to
force all the packets going to the slow path and then to the local port.
A tool[1] injects some packets to the veth outside the bridge, and measures
the delay until the packet is captured on the local port. The rx timestamp
is get from the socket ancillary data in the attribute SO_TIMESTAMPNS, to
avoid having the scheduler delay in the measured time.

The first test measures the average latency for an upcall generated from
a single port. To measure it 100k packets, one every msec, are sent to a
single port and the latencies are measured.

The second test is meant to check latency fairness among ports, namely if
latency is equal between ports or if some ports have lower priority.
The previous test is repeated for every port, the average of the average
latencies and the standard deviation between averages is measured.

The third test serves to measure responsiveness under load. Heavy traffic
is sent through all ports, latency and packet loss is measured
on a single idle port.

The fourth test is all about fairness. Heavy traffic is injected in all
ports but one, latency and packet loss is measured on the single idle port.

This is the test setup:

  # nproc
  56
  # ovs-vsctl show |grep -c Port
  2223
  # ovs-ofctl dump-flows ovs_upc_br
   cookie=0x0, duration=4.827s, table=0, n_packets=0, n_bytes=0, actions=CONTROLLER:65535,LOCAL
  # uname -a
  Linux fc28 4.18.7-200.fc28.x86_64 #1 SMP Mon Sep 10 15:44:45 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

And these are the results of the tests:

                                          Stock OVS                 Patched
  netlink sockets
  in use by vswitchd
  lsof -p $(pidof ovs-vswitchd) \
      |grep -c GENERIC                        91187                    2227

  Test 1
  one port latency
  min/avg/max/mdev (us)           2.7/6.6/238.7/1.8       1.6/6.8/160.6/1.7

  Test 2
  all port
  avg latency/mdev (us)                   6.51/0.97               6.86/0.17

  Test 3
  single port latency
  under load
  avg/mdev (us)                             7.5/5.9                 3.8/4.8
  packet loss                                  95 %                    62 %

  Test 4
  idle port latency
  under load
  min/avg/max/mdev (us)           0.8/1.5/210.5/0.9       1.0/2.1/344.5/1.2
  packet loss                                  94 %                     4 %

CPU and RAM usage seems not to be affected, the resource usage of vswitchd
idle with 2000+ ports is unchanged:

  # ps u $(pidof ovs-vswitchd)
  USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
  openvsw+  5430 54.3  0.3 4263964 510968 pts/1  RLl+ 16:20   0:50 ovs-vswitchd

Additionally, to check if vswitchd is thread safe with this patch, the
following test was run for circa 48 hours: on a 56 core machine, a
bridge with kernel datapath is filled with 2200 dummy interfaces and 22
veth, then 22 traffic generators are run in parallel piping traffic into
the veths peers outside the bridge.
To generate as many upcalls as possible, all packets were forced to the
slowpath with an openflow rule like 'action=controller,local' and packet
size was set to 64 byte. Also, to avoid overflowing the FDB early and
slowing down the upcall processing, generated mac addresses were restricted
to a small interval. vswitchd ran without problems for 48+ hours,
obviously with all the handler threads with almost 99% CPU usage.

[1] https://github.com/teknoraver/network-tools/blob/master/weed.c

Signed-off-by: Matteo Croce <mcroce@redhat.com>
---
v1 -> v2:
  - define EPOLLEXCLUSIVE on systems with older kernel headers
  - explain the thread safety test in the commit message

 lib/dpif-netlink.c | 311 ++++++++++++---------------------------------
 1 file changed, 82 insertions(+), 229 deletions(-)

diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index e6d5a6ec5..bb565ffee 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -78,6 +78,10 @@ enum { MAX_PORTS = USHRT_MAX };
 #define FLOW_DUMP_MAX_BATCH 50
 #define OPERATE_MAX_OPS 50
 
+#ifndef EPOLLEXCLUSIVE
+#define EPOLLEXCLUSIVE (1u << 28)
+#endif
+
 struct dpif_netlink_dp {
     /* Generic Netlink header. */
     uint8_t cmd;
@@ -170,7 +174,6 @@ struct dpif_windows_vport_sock {
 #endif
 
 struct dpif_handler {
-    struct dpif_channel *channels;/* Array of channels for each handler. */
     struct epoll_event *epoll_events;
     int epoll_fd;                 /* epoll fd that includes channel socks. */
     int n_events;                 /* Num events returned by epoll_wait(). */
@@ -193,6 +196,7 @@ struct dpif_netlink {
     struct fat_rwlock upcall_lock;
     struct dpif_handler *handlers;
     uint32_t n_handlers;           /* Num of upcall handlers. */
+    struct dpif_channel *channels; /* Array of channels for each port. */
     int uc_array_size;             /* Size of 'handler->channels' and */
                                    /* 'handler->epoll_events'. */
 
@@ -331,43 +335,6 @@ open_dpif(const struct dpif_netlink_dp *dp, struct dpif **dpifp)
     return 0;
 }
 
-/* Destroys the netlink sockets pointed by the elements in 'socksp'
- * and frees the 'socksp'.  */
-static void
-vport_del_socksp__(struct nl_sock **socksp, uint32_t n_socks)
-{
-    size_t i;
-
-    for (i = 0; i < n_socks; i++) {
-        nl_sock_destroy(socksp[i]);
-    }
-
-    free(socksp);
-}
-
-/* Creates an array of netlink sockets.  Returns an array of the
- * corresponding pointers.  Records the error in 'error'. */
-static struct nl_sock **
-vport_create_socksp__(uint32_t n_socks, int *error)
-{
-    struct nl_sock **socksp = xzalloc(n_socks * sizeof *socksp);
-    size_t i;
-
-    for (i = 0; i < n_socks; i++) {
-        *error = nl_sock_create(NETLINK_GENERIC, &socksp[i]);
-        if (*error) {
-            goto error;
-        }
-    }
-
-    return socksp;
-
-error:
-    vport_del_socksp__(socksp, n_socks);
-
-    return NULL;
-}
-
 #ifdef _WIN32
 static void
 vport_delete_sock_pool(struct dpif_handler *handler)
@@ -422,129 +389,34 @@ error:
     vport_delete_sock_pool(handler);
     return error;
 }
-
-/* Returns an array pointers to netlink sockets.  The sockets are picked from a
- * pool. Records the error in 'error'. */
-static struct nl_sock **
-vport_create_socksp_windows(struct dpif_netlink *dpif, int *error)
-    OVS_REQ_WRLOCK(dpif->upcall_lock)
-{
-    uint32_t n_socks = dpif->n_handlers;
-    struct nl_sock **socksp;
-    size_t i;
-
-    ovs_assert(n_socks <= 1);
-    socksp = xzalloc(n_socks * sizeof *socksp);
-
-    /* Pick netlink sockets to use in a round-robin fashion from each
-     * handler's pool of sockets. */
-    for (i = 0; i < n_socks; i++) {
-        struct dpif_handler *handler = &dpif->handlers[i];
-        struct dpif_windows_vport_sock *sock_pool = handler->vport_sock_pool;
-        size_t index = handler->last_used_pool_idx;
-
-        /* A pool of sockets is allocated when the handler is initialized. */
-        if (sock_pool == NULL) {
-            free(socksp);
-            *error = EINVAL;
-            return NULL;
-        }
-
-        ovs_assert(index < VPORT_SOCK_POOL_SIZE);
-        socksp[i] = sock_pool[index].nl_sock;
-        socksp[i] = sock_pool[index].nl_sock;
-        ovs_assert(socksp[i]);
-        index = (index == VPORT_SOCK_POOL_SIZE - 1) ? 0 : index + 1;
-        handler->last_used_pool_idx = index;
-    }
-
-    return socksp;
-}
-
-static void
-vport_del_socksp_windows(struct dpif_netlink *dpif, struct nl_sock **socksp)
-{
-    free(socksp);
-}
 #endif /* _WIN32 */
 
-static struct nl_sock **
-vport_create_socksp(struct dpif_netlink *dpif, int *error)
-{
-#ifdef _WIN32
-    return vport_create_socksp_windows(dpif, error);
-#else
-    return vport_create_socksp__(dpif->n_handlers, error);
-#endif
-}
-
-static void
-vport_del_socksp(struct dpif_netlink *dpif, struct nl_sock **socksp)
-{
-#ifdef _WIN32
-    vport_del_socksp_windows(dpif, socksp);
-#else
-    vport_del_socksp__(socksp, dpif->n_handlers);
-#endif
-}
-
-/* Given the array of pointers to netlink sockets 'socksp', returns
- * the array of corresponding pids. If the 'socksp' is NULL, returns
- * a single-element array of value 0. */
-static uint32_t *
-vport_socksp_to_pids(struct nl_sock **socksp, uint32_t n_socks)
-{
-    uint32_t *pids;
-
-    if (!socksp) {
-        pids = xzalloc(sizeof *pids);
-    } else {
-        size_t i;
-
-        pids = xzalloc(n_socks * sizeof *pids);
-        for (i = 0; i < n_socks; i++) {
-            pids[i] = nl_sock_pid(socksp[i]);
-        }
-    }
-
-    return pids;
-}
-
-/* Given the port number 'port_idx', extracts the pids of netlink sockets
- * associated to the port and assigns it to 'upcall_pids'. */
+/* Given the port number 'port_idx', extracts the pid of netlink socket
+ * associated to the port and assigns it to 'upcall_pid'. */
 static bool
-vport_get_pids(struct dpif_netlink *dpif, uint32_t port_idx,
-               uint32_t **upcall_pids)
+vport_get_pid(struct dpif_netlink *dpif, uint32_t port_idx,
+              uint32_t *upcall_pid)
 {
-    uint32_t *pids;
-    size_t i;
-
     /* Since the nl_sock can only be assigned in either all
-     * or none "dpif->handlers" channels, the following check
+     * or none "dpif" channels, the following check
      * would suffice. */
-    if (!dpif->handlers[0].channels[port_idx].sock) {
+    if (!dpif->channels[port_idx].sock) {
         return false;
     }
     ovs_assert(!WINDOWS || dpif->n_handlers <= 1);
 
-    pids = xzalloc(dpif->n_handlers * sizeof *pids);
-
-    for (i = 0; i < dpif->n_handlers; i++) {
-        pids[i] = nl_sock_pid(dpif->handlers[i].channels[port_idx].sock);
-    }
-
-    *upcall_pids = pids;
+    *upcall_pid = nl_sock_pid(dpif->channels[port_idx].sock);
 
     return true;
 }
 
 static int
-vport_add_channels(struct dpif_netlink *dpif, odp_port_t port_no,
-                   struct nl_sock **socksp)
+vport_add_channel(struct dpif_netlink *dpif, odp_port_t port_no,
+                  struct nl_sock *socksp)
 {
     struct epoll_event event;
     uint32_t port_idx = odp_to_u32(port_no);
-    size_t i, j;
+    size_t i;
     int error;
 
     if (dpif->handlers == NULL) {
@@ -553,7 +425,7 @@ vport_add_channels(struct dpif_netlink *dpif, odp_port_t port_no,
 
     /* We assume that the datapath densely chooses port numbers, which can
      * therefore be used as an index into 'channels' and 'epoll_events' of
-     * 'dpif->handler'. */
+     * 'dpif'. */
     if (port_idx >= dpif->uc_array_size) {
         uint32_t new_size = port_idx + 1;
 
@@ -563,15 +435,15 @@ vport_add_channels(struct dpif_netlink *dpif, odp_port_t port_no,
             return EFBIG;
         }
 
-        for (i = 0; i < dpif->n_handlers; i++) {
-            struct dpif_handler *handler = &dpif->handlers[i];
+        dpif->channels = xrealloc(dpif->channels,
+                                  new_size * sizeof *dpif->channels);
 
-            handler->channels = xrealloc(handler->channels,
-                                         new_size * sizeof *handler->channels);
+        for (i = dpif->uc_array_size; i < new_size; i++) {
+            dpif->channels[i].sock = NULL;
+        }
 
-            for (j = dpif->uc_array_size; j < new_size; j++) {
-                handler->channels[j].sock = NULL;
-            }
+        for (i = 0; i < dpif->n_handlers; i++) {
+            struct dpif_handler *handler = &dpif->handlers[i];
 
             handler->epoll_events = xrealloc(handler->epoll_events,
                 new_size * sizeof *handler->epoll_events);
@@ -581,33 +453,33 @@ vport_add_channels(struct dpif_netlink *dpif, odp_port_t port_no,
     }
 
     memset(&event, 0, sizeof event);
-    event.events = EPOLLIN;
+    event.events = EPOLLIN | EPOLLEXCLUSIVE;
     event.data.u32 = port_idx;
 
     for (i = 0; i < dpif->n_handlers; i++) {
         struct dpif_handler *handler = &dpif->handlers[i];
 
 #ifndef _WIN32
-        if (epoll_ctl(handler->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(socksp[i]),
+        if (epoll_ctl(handler->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(socksp),
                       &event) < 0) {
             error = errno;
             goto error;
         }
 #endif
-        dpif->handlers[i].channels[port_idx].sock = socksp[i];
-        dpif->handlers[i].channels[port_idx].last_poll = LLONG_MIN;
     }
+    dpif->channels[port_idx].sock = socksp;
+    dpif->channels[port_idx].last_poll = LLONG_MIN;
 
     return 0;
 
 error:
-    for (j = 0; j < i; j++) {
 #ifndef _WIN32
-        epoll_ctl(dpif->handlers[j].epoll_fd, EPOLL_CTL_DEL,
-                  nl_sock_fd(socksp[j]), NULL);
-#endif
-        dpif->handlers[j].channels[port_idx].sock = NULL;
+    while (i--) {
+        epoll_ctl(dpif->handlers[i].epoll_fd, EPOLL_CTL_DEL,
+                  nl_sock_fd(socksp), NULL);
     }
+#endif
+    dpif->channels[port_idx].sock = NULL;
 
     return error;
 }
@@ -618,14 +490,8 @@ vport_del_channels(struct dpif_netlink *dpif, odp_port_t port_no)
     uint32_t port_idx = odp_to_u32(port_no);
     size_t i;
 
-    if (!dpif->handlers || port_idx >= dpif->uc_array_size) {
-        return;
-    }
-
-    /* Since the sock can only be assigned in either all or none
-     * of "dpif->handlers" channels, the following check would
-     * suffice. */
-    if (!dpif->handlers[0].channels[port_idx].sock) {
+    if (!dpif->handlers || port_idx >= dpif->uc_array_size
+        || !dpif->channels[port_idx].sock) {
         return;
     }
 
@@ -633,12 +499,14 @@ vport_del_channels(struct dpif_netlink *dpif, odp_port_t port_no)
         struct dpif_handler *handler = &dpif->handlers[i];
 #ifndef _WIN32
         epoll_ctl(handler->epoll_fd, EPOLL_CTL_DEL,
-                  nl_sock_fd(handler->channels[port_idx].sock), NULL);
-        nl_sock_destroy(handler->channels[port_idx].sock);
+                  nl_sock_fd(dpif->channels[port_idx].sock), NULL);
 #endif
-        handler->channels[port_idx].sock = NULL;
         handler->event_offset = handler->n_events = 0;
     }
+#ifndef _WIN32
+    nl_sock_destroy(dpif->channels[port_idx].sock);
+#endif
+    dpif->channels[port_idx].sock = NULL;
 }
 
 static void
@@ -655,10 +523,7 @@ destroy_all_channels(struct dpif_netlink *dpif)
         struct dpif_netlink_vport vport_request;
         uint32_t upcall_pids = 0;
 
-        /* Since the sock can only be assigned in either all or none
-         * of "dpif->handlers" channels, the following check would
-         * suffice. */
-        if (!dpif->handlers[0].channels[i].sock) {
+        if (!dpif->channels[i].sock) {
             continue;
         }
 
@@ -679,11 +544,11 @@ destroy_all_channels(struct dpif_netlink *dpif)
 
         dpif_netlink_handler_uninit(handler);
         free(handler->epoll_events);
-        free(handler->channels);
     }
-
+    free(dpif->channels);
     free(dpif->handlers);
     dpif->handlers = NULL;
+    dpif->channels = NULL;
     dpif->n_handlers = 0;
     dpif->uc_array_size = 0;
 }
@@ -846,13 +711,12 @@ dpif_netlink_port_add__(struct dpif_netlink *dpif, const char *name,
 {
     struct dpif_netlink_vport request, reply;
     struct ofpbuf *buf;
-    struct nl_sock **socksp = NULL;
-    uint32_t *upcall_pids;
+    struct nl_sock *socksp = NULL;
+    uint32_t upcall_pids;
     int error = 0;
 
     if (dpif->handlers) {
-        socksp = vport_create_socksp(dpif, &error);
-        if (!socksp) {
+        if (nl_sock_create(NETLINK_GENERIC, &socksp)) {
             return error;
         }
     }
@@ -864,9 +728,9 @@ dpif_netlink_port_add__(struct dpif_netlink *dpif, const char *name,
     request.name = name;
 
     request.port_no = *port_nop;
-    upcall_pids = vport_socksp_to_pids(socksp, dpif->n_handlers);
-    request.n_upcall_pids = socksp ? dpif->n_handlers : 1;
-    request.upcall_pids = upcall_pids;
+    upcall_pids = nl_sock_pid(socksp);
+    request.n_upcall_pids = 1;
+    request.upcall_pids = &upcall_pids;
 
     if (options) {
         request.options = options->data;
@@ -882,31 +746,27 @@ dpif_netlink_port_add__(struct dpif_netlink *dpif, const char *name,
                       dpif_name(&dpif->dpif), *port_nop);
         }
 
-        vport_del_socksp(dpif, socksp);
+        nl_sock_destroy(socksp);
         goto exit;
     }
 
-    if (socksp) {
-        error = vport_add_channels(dpif, *port_nop, socksp);
-        if (error) {
-            VLOG_INFO("%s: could not add channel for port %s",
-                      dpif_name(&dpif->dpif), name);
-
-            /* Delete the port. */
-            dpif_netlink_vport_init(&request);
-            request.cmd = OVS_VPORT_CMD_DEL;
-            request.dp_ifindex = dpif->dp_ifindex;
-            request.port_no = *port_nop;
-            dpif_netlink_vport_transact(&request, NULL, NULL);
-            vport_del_socksp(dpif, socksp);
-            goto exit;
-        }
+    error = vport_add_channel(dpif, *port_nop, socksp);
+    if (error) {
+        VLOG_INFO("%s: could not add channel for port %s",
+                    dpif_name(&dpif->dpif), name);
+
+        /* Delete the port. */
+        dpif_netlink_vport_init(&request);
+        request.cmd = OVS_VPORT_CMD_DEL;
+        request.dp_ifindex = dpif->dp_ifindex;
+        request.port_no = *port_nop;
+        dpif_netlink_vport_transact(&request, NULL, NULL);
+        nl_sock_destroy(socksp);
+        goto exit;
     }
-    free(socksp);
 
 exit:
     ofpbuf_delete(buf);
-    free(upcall_pids);
 
     return error;
 }
@@ -1131,7 +991,7 @@ dpif_netlink_port_query_by_name(const struct dpif *dpif_, const char *devname,
 
 static uint32_t
 dpif_netlink_port_get_pid__(const struct dpif_netlink *dpif,
-                            odp_port_t port_no, uint32_t hash)
+                            odp_port_t port_no, uint32_t hash OVS_UNUSED)
     OVS_REQ_RDLOCK(dpif->upcall_lock)
 {
     uint32_t port_idx = odp_to_u32(port_no);
@@ -1141,14 +1001,13 @@ dpif_netlink_port_get_pid__(const struct dpif_netlink *dpif,
         /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
          * channel, since it is not heavily loaded. */
         uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
-        struct dpif_handler *h = &dpif->handlers[hash % dpif->n_handlers];
 
         /* Needs to check in case the socket pointer is changed in between
          * the holding of upcall_lock.  A known case happens when the main
          * thread deletes the vport while the handler thread is handling
          * the upcall from that port. */
-        if (h->channels[idx].sock) {
-            pid = nl_sock_pid(h->channels[idx].sock);
+        if (dpif->channels[idx].sock) {
+            pid = nl_sock_pid(dpif->channels[idx].sock);
         }
     }
 
@@ -2382,42 +2241,40 @@ dpif_netlink_refresh_channels(struct dpif_netlink *dpif, uint32_t n_handlers)
     dpif_netlink_port_dump_start__(dpif, &dump);
     while (!dpif_netlink_port_dump_next__(dpif, &dump, &vport, &buf)) {
         uint32_t port_no = odp_to_u32(vport.port_no);
-        uint32_t *upcall_pids = NULL;
+        uint32_t upcall_pid;
         int error;
 
         if (port_no >= dpif->uc_array_size
-            || !vport_get_pids(dpif, port_no, &upcall_pids)) {
-            struct nl_sock **socksp = vport_create_socksp(dpif, &error);
+            || !vport_get_pid(dpif, port_no, &upcall_pid)) {
+            struct nl_sock *socksp;
 
-            if (!socksp) {
+            if (nl_sock_create(NETLINK_GENERIC, &socksp)) {
                 goto error;
             }
 
-            error = vport_add_channels(dpif, vport.port_no, socksp);
+            error = vport_add_channel(dpif, vport.port_no, socksp);
             if (error) {
                 VLOG_INFO("%s: could not add channels for port %s",
                           dpif_name(&dpif->dpif), vport.name);
-                vport_del_socksp(dpif, socksp);
+                nl_sock_destroy(socksp);
                 retval = error;
                 goto error;
             }
-            upcall_pids = vport_socksp_to_pids(socksp, dpif->n_handlers);
-            free(socksp);
+            upcall_pid = nl_sock_pid(socksp);
         }
 
         /* Configure the vport to deliver misses to 'sock'. */
         if (vport.upcall_pids[0] == 0
-            || vport.n_upcall_pids != dpif->n_handlers
-            || memcmp(upcall_pids, vport.upcall_pids, n_handlers * sizeof
-                      *upcall_pids)) {
+            || vport.n_upcall_pids != 1
+            || upcall_pid != vport.upcall_pids[0]) {
             struct dpif_netlink_vport vport_request;
 
             dpif_netlink_vport_init(&vport_request);
             vport_request.cmd = OVS_VPORT_CMD_SET;
             vport_request.dp_ifindex = dpif->dp_ifindex;
             vport_request.port_no = vport.port_no;
-            vport_request.n_upcall_pids = dpif->n_handlers;
-            vport_request.upcall_pids = upcall_pids;
+            vport_request.n_upcall_pids = 1;
+            vport_request.upcall_pids = &upcall_pid;
             error = dpif_netlink_vport_transact(&vport_request, NULL, NULL);
             if (error) {
                 VLOG_WARN_RL(&error_rl,
@@ -2438,11 +2295,9 @@ dpif_netlink_refresh_channels(struct dpif_netlink *dpif, uint32_t n_handlers)
         if (port_no < keep_channels_nbits) {
             bitmap_set1(keep_channels, port_no);
         }
-        free(upcall_pids);
         continue;
 
     error:
-        free(upcall_pids);
         vport_del_channels(dpif, vport.port_no);
     }
     nl_dump_done(&dump);
@@ -2701,7 +2556,7 @@ dpif_netlink_recv__(struct dpif_netlink *dpif, uint32_t handler_id,
 
     while (handler->event_offset < handler->n_events) {
         int idx = handler->epoll_events[handler->event_offset].data.u32;
-        struct dpif_channel *ch = &dpif->handlers[handler_id].channels[idx];
+        struct dpif_channel *ch = &dpif->channels[idx];
 
         handler->event_offset++;
 
@@ -2803,16 +2658,14 @@ dpif_netlink_recv_purge__(struct dpif_netlink *dpif)
     OVS_REQ_WRLOCK(dpif->upcall_lock)
 {
     if (dpif->handlers) {
-        size_t i, j;
+        size_t i;
 
+        if (!dpif->channels[0].sock) {
+            return;
+        }
         for (i = 0; i < dpif->uc_array_size; i++ ) {
-            if (!dpif->handlers[0].channels[i].sock) {
-                continue;
-            }
 
-            for (j = 0; j < dpif->n_handlers; j++) {
-                nl_sock_drain(dpif->handlers[j].channels[i].sock);
-            }
+            nl_sock_drain(dpif->channels[i].sock);
         }
     }
 }
-- 
2.17.1