Blob Blame History Raw
diff --git a/cib/callbacks.c b/cib/callbacks.c
index 754e218..77853d9 100644
--- a/cib/callbacks.c
+++ b/cib/callbacks.c
@@ -1391,7 +1391,6 @@ initiate_exit(void)
 
 extern int remote_fd;
 extern int remote_tls_fd;
-extern void terminate_cs_connection(void);
 
 void
 terminate_cib(const char *caller, gboolean fast)
diff --git a/cib/main.c b/cib/main.c
index 6b56274..3328558 100644
--- a/cib/main.c
+++ b/cib/main.c
@@ -371,15 +371,25 @@ ccm_connect(void)
 #endif
 
 #if SUPPORT_COROSYNC
-static gboolean
-cib_ais_dispatch(int kind, const char *from, const char *data)
+static void
+cib_cs_dispatch(cpg_handle_t handle,
+                 const struct cpg_name *groupName,
+                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
+    uint32_t kind = 0;
     xmlNode *xml = NULL;
+    const char *from = NULL;
+    char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
 
+    if(data == NULL) {
+        return;
+    }
     if (kind == crm_class_cluster) {
         xml = string2xml(data);
         if (xml == NULL) {
-            goto bail;
+            crm_err("Invalid XML: '%.120s'", data);
+            free(data);
+            return;
         }
         crm_xml_add(xml, F_ORIG, from);
         /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */
@@ -387,16 +397,11 @@ cib_ais_dispatch(int kind, const char *from, const char *data)
     }
 
     free_xml(xml);
-    return TRUE;
-
-  bail:
-    crm_err("Invalid XML: '%.120s'", data);
-    return TRUE;
-
+    free(data);
 }
 
 static void
-cib_ais_destroy(gpointer user_data)
+cib_cs_destroy(gpointer user_data)
 {
     if (cib_shutdown_flag) {
         crm_info("Corosync disconnection complete");
@@ -463,8 +468,9 @@ cib_init(void)
 {
     if (is_openais_cluster()) {
 #if SUPPORT_COROSYNC
-        crm_cluster.destroy = cib_ais_destroy;
-        crm_cluster.cs_dispatch = cib_ais_dispatch;
+        crm_cluster.destroy = cib_cs_destroy;
+        crm_cluster.cpg.cpg_deliver_fn = cib_cs_dispatch;
+        crm_cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership;
 #endif
     } else if (is_heartbeat_cluster()) {
 #if SUPPORT_HEARTBEAT
diff --git a/configure.ac b/configure.ac
index be8261a..7d2e384 100644
--- a/configure.ac
+++ b/configure.ac
@@ -132,7 +132,7 @@ try_extract_header_define() {
 	      AC_MSG_RESULT($value)
 	  fi
 	  printf $value
-	  rm -rf ${Cfile}.cc ${Cfile} ${Cfile}.dSYM ${Cfile}.gcno
+	  rm -rf ${Cfile}.c ${Cfile} ${Cfile}.dSYM ${Cfile}.gcno
 	}
 
 extract_header_define() {
@@ -669,14 +669,6 @@ else
 fi
 AC_MSG_RESULT(using $GLIBCONFIG)
 
-if
-    $PKGCONFIG --exists systemd
-then
-    systemdunitdir=`$PKGCONFIG --variable=systemdsystemunitdir systemd`
-    AC_SUBST(systemdunitdir)
-fi
-AM_CONDITIONAL(HAVE_SYSTEMD, test -n "$systemdunitdir" -a "x$systemdunitdir" != xno)
-
 #
 #	Where is dlopen?
 #
@@ -965,50 +957,37 @@ dnl ========================================================================
 dnl    Profiling and GProf
 dnl ========================================================================
 
-case $SUPPORT_PROFILING in
+case $SUPPORT_GCOV in
      1|yes|true)
 	SUPPORT_PROFILING=1
-
-        dnl Enable gprof
-	#LIBS="$LIBS -pg"
-	#CFLAGS="$CFLAGS -pg"
-
-	dnl Disable various compiler optimizations
-	CFLAGS="$CFLAGS -fno-omit-frame-pointer"
-	#CFLAGS="$CFLAGS -fno-inline-functions -fno-inline-functions-called-once -fno-optimize-sibling-calls"
-	dnl CFLAGS="$CFLAGS -fno-default-inline -fno-inline"
-
-	dnl Update features
-	PCMK_FEATURES="$PCMK_FEATURES gprof"
 	;;
-     *) SUPPORT_PROFILING=0;;
 esac
-AC_DEFINE_UNQUOTED(SUPPORT_PROFILING, $SUPPORT_PROFILING, Support for gprof profiling)
 
-case $SUPPORT_GCOV in
+case $SUPPORT_PROFILING in
      1|yes|true)
-	SUPPORT_GCOV=1
+	SUPPORT_PROFILING=1
 
         dnl Enable gprof
 	#LIBS="$LIBS -pg"
 	#CFLAGS="$CFLAGS -pg"
 
 	dnl Disable various compiler optimizations
-	CFLAGS="$CFLAGS -fprofile-arcs -ftest-coverage -fno-inline"
+	CFLAGS="$CFLAGS -fno-omit-frame-pointer -fprofile-arcs -ftest-coverage -fno-inline"
+	#CFLAGS="$CFLAGS -fno-inline-functions -fno-inline-functions-called-once -fno-optimize-sibling-calls"
+	dnl CFLAGS="$CFLAGS -fno-default-inline -fno-inline"
 
-	dnl Turn off optimization so code coverage tool
-	dnl can get accurate line numbers
+	dnl Turn off optimization so code coverage tool can get accurate line numbers
 	AC_MSG_NOTICE(Old CFLAGS: $CFLAGS)
-	CFLAGS=`echo $CFLAGS | sed -e 's/-O.\ //g' -e 's/-Wp,-D_FORTIFY_SOURCE=.\ //g'`
+	CFLAGS=`echo $CFLAGS | sed -e 's/-O.\ //g' -e 's/-Wp,-D_FORTIFY_SOURCE=.\ //g' -e 's/-D_FORTIFY_SOURCE=.\ //g'`
 	CFLAGS="$CFLAGS -O0"
 	AC_MSG_NOTICE(New CFLAGS: $CFLAGS)
 
 	dnl Update features
-	PCMK_FEATURES="$PCMK_FEATURES gcov"
+	PCMK_FEATURES="$PCMK_FEATURES profile"
 	;;
      *) SUPPORT_PROFILING=0;;
 esac
-AC_DEFINE_UNQUOTED(SUPPORT_GCOV, $SUPPORT_GCOV, Support for gcov coverage testing)
+AC_DEFINE_UNQUOTED(SUPPORT_PROFILING, $SUPPORT_PROFILING, Support for profiling)
 
 dnl ========================================================================
 dnl    Cluster infrastructure - Heartbeat / LibQB
@@ -1192,14 +1171,25 @@ fi
 AC_DEFINE_UNQUOTED(SUPPORT_UPSTART, $HAVE_upstart, Support upstart based system services)
 AM_CONDITIONAL(BUILD_UPSTART, test $HAVE_upstart = 1)
 
+if
+    $PKGCONFIG --exists systemd
+then
+    systemdunitdir=`$PKGCONFIG --variable=systemdsystemunitdir systemd`
+    AC_SUBST(systemdunitdir)
+else
+    enable_systemd=no
+fi
+
 if test $HAVE_gio = 1 -a "x${enable_systemd}" != xno; then
-   HAVE_systemd=1
-   PCMK_FEATURES="$PCMK_FEATURES systemd"
+   if test -n "$systemdunitdir" -a "x$systemdunitdir" != xno; then
+      HAVE_systemd=1
+      PCMK_FEATURES="$PCMK_FEATURES systemd"
+   fi
 fi
+
 AC_DEFINE_UNQUOTED(SUPPORT_SYSTEMD, $HAVE_systemd, Support systemd based system services)
 AM_CONDITIONAL(BUILD_SYSTEMD, test $HAVE_systemd = 1)
 
-
 case $SUPPORT_NAGIOS in
      1|yes|true|try)
         SUPPORT_NAGIOS=1;;
diff --git a/crmd/control.c b/crmd/control.c
index 7f423db..0808f56 100644
--- a/crmd/control.c
+++ b/crmd/control.c
@@ -915,7 +915,7 @@ config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void
     if (is_classic_ais_cluster()) {
         value = crmd_pref(config_hash, XML_ATTR_EXPECTED_VOTES);
         crm_debug("Sending expected-votes=%s to corosync", value);
-        send_ais_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais);
+        send_cluster_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais);
     }
 #endif
 
diff --git a/crmd/corosync.c b/crmd/corosync.c
index 6385780..c4aef38 100644
--- a/crmd/corosync.c
+++ b/crmd/corosync.c
@@ -41,8 +41,10 @@ extern void crmd_ha_connection_destroy(gpointer user_data);
 /*	 A_HA_CONNECT	*/
 #if SUPPORT_COROSYNC
 
-static gboolean
-crmd_ais_dispatch(int kind, const char *from, const char *data)
+static void
+crmd_cs_dispatch(cpg_handle_t handle,
+                         const struct cpg_name *groupName,
+                         uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
     int seq = 0;
     xmlNode *xml = NULL;
@@ -50,10 +52,18 @@ crmd_ais_dispatch(int kind, const char *from, const char *data)
     crm_node_t *peer = NULL;
     enum crm_proc_flag flag = crm_proc_cpg;
 
+    uint32_t kind = 0;
+    const char *from = NULL;
+    char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+
+    if(data == NULL) {
+        return;
+    }
     xml = string2xml(data);
     if (xml == NULL) {
         crm_err("Could not parse message content (%d): %.100s", kind, data);
-        return TRUE;
+        free(data);
+        return;
     }
 
     switch (kind) {
@@ -103,8 +113,8 @@ crmd_ais_dispatch(int kind, const char *from, const char *data)
                 /* If we can still talk to our peer process on that node,
                  * then its also part of the corosync membership
                  */
-                crm_err("Recieving messages from a node we think is dead: %s[%d]", peer->uname,
-                        peer->id);
+                crm_warn("Recieving messages from a node we think is dead: %s[%d]", peer->uname,
+                         peer->id);
                 crm_update_peer_proc(__FUNCTION__, peer, flag, ONLINESTATUS);
             }
             crmd_ha_msg_filter(xml);
@@ -123,8 +133,8 @@ crmd_ais_dispatch(int kind, const char *from, const char *data)
             crm_err("Invalid message class (%d): %.100s", kind, data);
     }
 
+    free(data);
     free_xml(xml);
-    return TRUE;
 }
 
 static gboolean
@@ -148,7 +158,7 @@ crmd_quorum_destroy(gpointer user_data)
 }
 
 static void
-crmd_ais_destroy(gpointer user_data)
+crmd_cs_destroy(gpointer user_data)
 {
     if (is_not_set(fsa_input_register, R_HA_DISCONNECTED)) {
         crm_err("connection terminated");
@@ -182,8 +192,9 @@ crm_connect_corosync(crm_cluster_t * cluster)
 
     if (is_openais_cluster()) {
         crm_set_status_callback(&peer_update_callback);
-        cluster->cs_dispatch = crmd_ais_dispatch;
-        cluster->destroy = crmd_ais_destroy;
+        cluster->cpg.cpg_deliver_fn = crmd_cs_dispatch;
+        cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
+        cluster->destroy = crmd_cs_destroy;
 
         rc = crm_cluster_connect(cluster);
     }
diff --git a/crmd/election.c b/crmd/election.c
index 1946858..25cb647 100644
--- a/crmd/election.c
+++ b/crmd/election.c
@@ -518,7 +518,7 @@ do_dc_takeover(long long action,
 
 #if SUPPORT_COROSYNC
     if (is_classic_ais_cluster()) {
-        send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
+        send_cluster_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
     }
 #endif
 
diff --git a/crmd/lrm.c b/crmd/lrm.c
index 31f00d7..15bad88 100644
--- a/crmd/lrm.c
+++ b/crmd/lrm.c
@@ -1929,6 +1929,7 @@ do_update_resource(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, lrmd_event_da
 
     } else {
         crm_warn("Resource %s no longer exists in the lrmd", op->rsc_id);
+        send_direct_ack(NULL, NULL, rsc, op, op->rsc_id);
         goto cleanup;
     }
 
diff --git a/doc/Pacemaker_Remote/en-US/Revision_History.xml b/doc/Pacemaker_Remote/en-US/Revision_History.xml
index 26d8ab6..257ecbd 100644
--- a/doc/Pacemaker_Remote/en-US/Revision_History.xml
+++ b/doc/Pacemaker_Remote/en-US/Revision_History.xml
@@ -8,13 +8,13 @@
 	<simpara>
 		<revhistory>
 			<revision>
-			  <revnumber>1</revnumber>
+			  <revnumber>1-0</revnumber>
 			  <date>Tue Mar 19 2013</date>
 			  <author><firstname>David</firstname><surname>Vossel</surname><email>dvossel@redhat.com</email></author>
 			  <revdescription><simplelist><member>Import from Pages.app</member></simplelist></revdescription>
 			</revision>
 			<revision>
-			  <revnumber>2</revnumber>
+			  <revnumber>2-0</revnumber>
 			  <date>Tue May 13 2013</date>
 			  <author><firstname>David</firstname><surname>Vossel</surname><email>dvossel@redhat.com</email></author>
 			  <revdescription><simplelist><member>Added Future Features Section</member></simplelist></revdescription>
diff --git a/extra/cluster-init b/extra/cluster-init
index 5dc71c2..fe0ff61 100755
--- a/extra/cluster-init
+++ b/extra/cluster-init
@@ -294,10 +294,10 @@ esac
 
 case $DATE in
     [Yy][Ee][Ss]|[Yy])
-	now=`date`
 	for host in $host_list; do
 	    echo "Setting time on ${host}"
 	    scp /etc/localtime root@${host}:/etc
+	    now=`date`
 	    ssh -l root ${host} -- date -s "'$now'"
 	    echo ""
 	done
diff --git a/fencing/fence_dummy b/fencing/fence_dummy
index b202977..8cf5103 100644
--- a/fencing/fence_dummy
+++ b/fencing/fence_dummy
@@ -5,7 +5,7 @@
 # Virsh 0.3.3 on RHEL 5.2 with xen-3.0.3-51
 #
 
-import sys, time, random
+import sys, time, random, os, atexit, getopt, re
 
 #BEGIN_VERSION_GENERATION
 RELEASE_VERSION="3.1.6"
@@ -42,14 +42,28 @@ all_opt = {
 	"debug" : {
 		"getopt" : "D:",
 		"longopt" : "debug-file", 
-		"help" : "-D, --debug-file=<debugfile>   Debugging to output file",
+		"help" : "-D, --debug-file=[debugfile]   Debugging to output file",
 		"required" : "0",
 		"shortdesc" : "Write debug information to given file",
 		"order" : 52 },
+	"random_sleep_range": {
+		"getopt" : "R:",
+		"required" : "0",
+		"longopt" : "random_sleep_range",
+		"help" : "--random_sleep-range=[seconds] Issue a sleep between 1 and [seconds]. Used for testing.",
+		"shortdesc" : "Issue a sleep between 1 and [seconds]",
+		"order" : 1 },
+	"mode": {
+		"getopt" : "M:",
+		"longopt" : "mode",
+		"required" : "0",
+		"help" : "--mode=(pass|fail|random). Used for testing.",
+		"shortdesc" : "Should operations always pass, always fail or fail at random",
+		"order" : 1 },
 	"delay" : {
 		"getopt" : "f:",
 		"longopt" : "delay",
-		"help" : "--delay <seconds>              Wait X seconds before fencing is started",
+		"help" : "--delay [seconds]              Wait X seconds before fencing is started",
 		"required" : "0",
 		"shortdesc" : "Wait X seconds before fencing is started",
 		"default" : "0",
@@ -57,7 +71,7 @@ all_opt = {
 	"action" : {
 		"getopt" : "o:",
 		"longopt" : "action",
-		"help" : "-o, --action=<action>          Action: status, reboot (default), off or on",
+		"help" : "-o, --action=[action]          Action: status, reboot (default), off or on",
 		"required" : "1",
 		"shortdesc" : "Fencing Action",
 		"default" : "reboot",
@@ -65,7 +79,7 @@ all_opt = {
 	"port" : {
 		"getopt" : "n:",
 		"longopt" : "plug",
-		"help" : "-n, --plug=<id>                Physical plug number on device or\n" + 
+		"help" : "-n, --plug=[id]                Physical plug number on device or\n" + 
                 "                                        name of virtual machine",
 		"required" : "1",
 		"shortdesc" : "Physical plug number or name of virtual machine",
@@ -73,7 +87,7 @@ all_opt = {
 	"switch" : {
 		"getopt" : "s:",
 		"longopt" : "switch",
-		"help" : "-s, --switch=<id>              Physical switch number on device",
+		"help" : "-s, --switch=[id]              Physical switch number on device",
 		"required" : "0",
 		"shortdesc" : "Physical switch number on device",
 		"order" : 1 },
@@ -86,8 +100,6 @@ all_opt = {
 		"order" : 1}
 }
 
-common_opt = [ "retry_on", "delay" ]
-
 def show_docs(options, docs = None):
 	device_opt = options["device_opt"]
 
@@ -189,12 +201,6 @@ def metadata(avail_opt, options, docs):
 
 def process_input(avail_opt):
 	global all_opt
-	global common_opt
-
-	##
-	## Add options which are available for every fence agent
-	#####
-	avail_opt.extend(common_opt)
 
 	##
 	## Set standard environment
@@ -290,24 +296,11 @@ def atexit_handler():
 		os.close(1)
 	except IOError:
 		sys.stderr.write("%s failed to close standard output\n"%(sys.argv[0]))
-		sys.exit(EC_GENERIC_ERROR)
+		sys.exit(1)
 
 def main():
     global all_opt
-    device_opt = [  "help", "version", "verbose", "debug", "action", "port",
-            "power_timeout", "random_sleep_range"]
-
-    all_opt["random_sleep_range"] = {
-        "getopt" : "R:",
-        "longopt" : "random_sleep_range",
-        "help" : "--random_sleep-range=<seconds>Issue a sleep between 1 and <seconds>. Used for testing.",
-        "order" : 1 }
-
-    all_opt["mode"] = {
-        "getopt" : "M:",
-        "longopt" : "mode",
-        "help" : "--mode=(pass|fail|random). Used for testing.",
-        "order" : 1 }
+    device_opt = [ "help", "version", "verbose", "debug", "action", "port", "mode", "random_sleep_range"]
 
     ## Defaults for fence agent
     docs = { }
@@ -316,6 +309,7 @@ def main():
 
     atexit.register(atexit_handler)
     options = process_input(device_opt)
+    options["device_opt"] = device_opt
     show_docs(options, docs)
 
     # random sleep for testing
diff --git a/fencing/main.c b/fencing/main.c
index c7b67a1..fee9f7a 100644
--- a/fencing/main.c
+++ b/fencing/main.c
@@ -190,15 +190,25 @@ stonith_peer_hb_destroy(gpointer user_data)
 #endif
 
 #if SUPPORT_COROSYNC
-static gboolean
-stonith_peer_ais_callback(int kind, const char *from, const char *data)
+static void
+stonith_peer_ais_callback(cpg_handle_t handle,
+                          const struct cpg_name *groupName,
+                          uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
+    uint32_t kind = 0;
     xmlNode *xml = NULL;
+    const char *from = NULL;
+    char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
 
+    if(data == NULL) {
+        return;
+    }
     if (kind == crm_class_cluster) {
         xml = string2xml(data);
         if (xml == NULL) {
-            goto bail;
+            crm_err("Invalid XML: '%.120s'", data);
+            free(data);
+            return;
         }
         crm_xml_add(xml, F_ORIG, from);
         /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */
@@ -206,18 +216,14 @@ stonith_peer_ais_callback(int kind, const char *from, const char *data)
     }
 
     free_xml(xml);
-    return TRUE;
-
-  bail:
-    crm_err("Invalid XML: '%.120s'", data);
-    return TRUE;
-
+    free(data);
+    return;
 }
 
 static void
-stonith_peer_ais_destroy(gpointer user_data)
+stonith_peer_cs_destroy(gpointer user_data)
 {
-    crm_err("AIS connection terminated");
+    crm_err("Corosync connection terminated");
     stonith_shutdown(0);
 }
 #endif
@@ -1084,8 +1090,9 @@ main(int argc, char **argv)
 
         if (is_openais_cluster()) {
 #if SUPPORT_COROSYNC
-            cluster.destroy = stonith_peer_ais_destroy;
-            cluster.cs_dispatch = stonith_peer_ais_callback;
+            cluster.destroy = stonith_peer_cs_destroy;
+            cluster.cpg.cpg_deliver_fn = stonith_peer_ais_callback;
+            cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership;
 #endif
         }
 
diff --git a/include/crm/cluster.h b/include/crm/cluster.h
index cac863f..c999367 100644
--- a/include/crm/cluster.h
+++ b/include/crm/cluster.h
@@ -26,9 +26,12 @@
 #    include <ocf/oc_event.h>
 #  endif
 
+#  if SUPPORT_COROSYNC
+#    include <corosync/cpg.h>
+#  endif
+
 extern gboolean crm_have_quorum;
 extern GHashTable *crm_peer_cache;
-extern GHashTable *crm_peer_id_cache;
 extern unsigned long long crm_peer_seq;
 
 #  ifndef CRM_SERVICE
@@ -73,21 +76,24 @@ typedef struct crm_peer_node_s {
 
 void crm_peer_init(void);
 void crm_peer_destroy(void);
-char *get_corosync_uuid(crm_node_t *peer);
-int get_corosync_id(int id, const char *uuid);
 
 typedef struct crm_cluster_s {
     char *uuid;
     char *uname;
     uint32_t nodeid;
 
+    void (*destroy) (gpointer);
+
 #  if SUPPORT_HEARTBEAT
     ll_cluster_t *hb_conn;
     void (*hb_dispatch) (HA_Message * msg, void *private);
 #  endif
 
-     gboolean(*cs_dispatch) (int kind, const char *from, const char *data);
-    void (*destroy) (gpointer);
+#  if SUPPORT_COROSYNC
+    struct cpg_name group;
+    cpg_callbacks_t cpg;
+    cpg_handle_t cpg_handle;
+#  endif
 
 } crm_cluster_t;
 
@@ -122,8 +128,6 @@ enum crm_ais_msg_types {
 gboolean send_cluster_message(crm_node_t * node, enum crm_ais_msg_types service,
                               xmlNode * data, gboolean ordered);
 
-void destroy_crm_node(gpointer /* crm_node_t* */ data);
-
 crm_node_t *crm_get_peer(unsigned int id, const char *uname);
 
 guint crm_active_peers(void);
@@ -138,8 +142,18 @@ gboolean crm_is_heartbeat_peer_active(const crm_node_t * node);
 
 #  if SUPPORT_COROSYNC
 extern int ais_fd_sync;
+uint32_t get_local_nodeid(cpg_handle_t handle);
+
+gboolean cluster_connect_cpg(crm_cluster_t *cluster);
+void cluster_disconnect_cpg(crm_cluster_t * cluster);
+
+void pcmk_cpg_membership(cpg_handle_t handle,
+                         const struct cpg_name *groupName,
+                         const struct cpg_address *member_list, size_t member_list_entries,
+                         const struct cpg_address *left_list, size_t left_list_entries,
+                         const struct cpg_address *joined_list, size_t joined_list_entries);
 gboolean crm_is_corosync_peer_active(const crm_node_t * node);
-gboolean send_ais_text(int class, const char *data, gboolean local,
+gboolean send_cluster_text(int class, const char *data, gboolean local,
                        crm_node_t * node, enum crm_ais_msg_types dest);
 #  endif
 
@@ -180,4 +194,7 @@ gboolean is_heartbeat_cluster(void);
 const char *get_local_node_name(void);
 char *get_node_name(uint32_t nodeid);
 
+char *pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *msg,
+                        uint32_t *kind, const char **from);
+
 #endif
diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h
index 2fa8e08..791a1f9 100644
--- a/include/crm/cluster/internal.h
+++ b/include/crm/cluster/internal.h
@@ -349,20 +349,24 @@ gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xml
 
 #  if SUPPORT_COROSYNC
 
+gboolean send_cpg_iov(struct iovec * iov);
+
 #    if SUPPORT_PLUGIN
 char *classic_node_name(uint32_t nodeid);
+void plugin_handle_membership(AIS_Message *msg);
+bool send_plugin_text(int class, struct iovec *iov);
 #    else
 char *corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid);
 #    endif
 
 gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent);
 
-gboolean send_ais_message(xmlNode * msg, gboolean local,
-                          crm_node_t * node, enum crm_ais_msg_types dest);
+gboolean send_cluster_message_cs(xmlNode * msg, gboolean local,
+                                 crm_node_t * node, enum crm_ais_msg_types dest);
 
 enum cluster_type_e find_corosync_variant(void);
 
-void terminate_cs_connection(void);
+void terminate_cs_connection(crm_cluster_t * cluster);
 gboolean init_cs_connection(crm_cluster_t * cluster);
 gboolean init_cs_connection_once(crm_cluster_t * cluster);
 #  endif
@@ -377,6 +381,8 @@ enum crm_quorum_source {
     crm_quorum_pacemaker,
 };
 
+int get_corosync_id(int id, const char *uuid);
+char *get_corosync_uuid(crm_node_t *peer);
 enum crm_quorum_source get_quorum_source(void);
 
 void crm_update_peer_proc(const char *source, crm_node_t * peer, uint32_t flag, const char *status);
diff --git a/lib/cluster/Makefile.am b/lib/cluster/Makefile.am
index a5a70ff..744ff27 100644
--- a/lib/cluster/Makefile.am
+++ b/lib/cluster/Makefile.am
@@ -33,6 +33,7 @@ libcrmcluster_la_LIBADD  = $(top_builddir)/lib/common/libcrmcommon.la $(top_buil
 libcrmcluster_la_DEPENDENCIES = $(top_builddir)/lib/common/libcrmcommon.la $(top_builddir)/lib/fencing/libstonithd.la 
 
 if BUILD_CS_SUPPORT
+libcrmcluster_la_SOURCES += cpg.c
 if BUILD_CS_PLUGIN
 libcrmcluster_la_SOURCES += legacy.c
 else
diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c
index 9538816..5820c8d 100644
--- a/lib/cluster/cluster.c
+++ b/lib/cluster/cluster.c
@@ -240,7 +240,7 @@ crm_cluster_disconnect(crm_cluster_t * cluster)
 #if SUPPORT_COROSYNC
     if (is_openais_cluster()) {
         crm_peer_destroy();
-        terminate_cs_connection();
+        terminate_cs_connection(cluster);
         crm_info("Disconnected from %s", type_str);
         return;
     }
@@ -274,7 +274,7 @@ send_cluster_message(crm_node_t * node, enum crm_ais_msg_types service, xmlNode
 
 #if SUPPORT_COROSYNC
     if (is_openais_cluster()) {
-        return send_ais_message(data, FALSE, node, service);
+        return send_cluster_message_cs(data, FALSE, node, service);
     }
 #endif
 #if SUPPORT_HEARTBEAT
diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c
index 83a0c78..5a64fe1 100644
--- a/lib/cluster/corosync.c
+++ b/lib/cluster/corosync.c
@@ -34,69 +34,16 @@
 #include <corosync/corodefs.h>
 #include <corosync/corotypes.h>
 #include <corosync/hdb.h>
-#include <corosync/cpg.h>
 #include <corosync/cfg.h>
 #include <corosync/cmap.h>
 #include <corosync/quorum.h>
 
 #include <crm/msg_xml.h>
 
-cpg_handle_t pcmk_cpg_handle = 0;
-
-struct cpg_name pcmk_cpg_group = {
-    .length = 0,
-    .value[0] = 0,
-};
-
 quorum_handle_t pcmk_quorum_handle = 0;
 
 gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL;
 
-#define cs_repeat(counter, max, code) do {		\
-	code;						\
-	if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
-	    counter++;					\
-	    crm_debug("Retrying operation after %ds", counter);	\
-	    sleep(counter);				\
-	} else {                                        \
-            break;                                      \
-        }                                               \
-    } while(counter < max)
-
-static uint32_t get_local_nodeid(cpg_handle_t handle)
-{
-    int rc = CS_OK;
-    int retries = 0;
-    static uint32_t local_nodeid = 0;
-    cpg_handle_t local_handle = handle;
-    cpg_callbacks_t cb = { };
-
-    if(local_nodeid != 0) {
-        return local_nodeid;
-    }
-
-    if(handle == 0) {
-        crm_trace("Creating connection");
-        cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
-    }
-
-    if (rc == CS_OK) {
-        retries = 0;
-        crm_trace("Performing lookup");
-        cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
-    }
-
-    if (rc != CS_OK) {
-        crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
-    }
-    if(handle == 0) {
-        crm_trace("Closing connection");
-        cpg_finalize(local_handle);
-    }
-    crm_debug("Local nodeid is %u", local_nodeid);
-    return local_nodeid;
-}
-
 /*
  * CFG functionality stolen from node_name() in corosync-quorumtool.c
  * This resolves the first address assigned to a node and returns the name or IP address.
@@ -189,281 +136,12 @@ corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid)
     return name;
 }
 
-enum crm_ais_msg_types
-text2msg_type(const char *text)
-{
-    int type = crm_msg_none;
-
-    CRM_CHECK(text != NULL, return type);
-    if (safe_str_eq(text, "ais")) {
-        type = crm_msg_ais;
-    } else if (safe_str_eq(text, "crm_plugin")) {
-        type = crm_msg_ais;
-    } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
-        type = crm_msg_cib;
-    } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
-        type = crm_msg_crmd;
-    } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
-        type = crm_msg_crmd;
-    } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
-        type = crm_msg_te;
-    } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
-        type = crm_msg_pe;
-    } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
-        type = crm_msg_lrmd;
-    } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
-        type = crm_msg_stonithd;
-    } else if (safe_str_eq(text, "stonith-ng")) {
-        type = crm_msg_stonith_ng;
-    } else if (safe_str_eq(text, "attrd")) {
-        type = crm_msg_attrd;
-
-    } else {
-        /* This will normally be a transient client rather than
-         * a cluster daemon.  Set the type to the pid of the client
-         */
-        int scan_rc = sscanf(text, "%d", &type);
-
-        if (scan_rc != 1) {
-            /* Ensure its sane */
-            type = crm_msg_none;
-        }
-    }
-    return type;
-}
-
-GListPtr cs_message_queue = NULL;
-int cs_message_timer = 0;
-
-static ssize_t crm_cs_flush(void);
-
-static gboolean
-crm_cs_flush_cb(gpointer data)
-{
-    cs_message_timer = 0;
-    crm_cs_flush();
-    return FALSE;
-}
-
-#define CS_SEND_MAX 200
-static ssize_t
-crm_cs_flush(void)
-{
-    int sent = 0;
-    ssize_t rc = 0;
-    int queue_len = 0;
-    static unsigned int last_sent = 0;
-
-    if (pcmk_cpg_handle == 0) {
-        crm_trace("Connection is dead");
-        return pcmk_ok;
-    }
-
-    queue_len = g_list_length(cs_message_queue);
-    if ((queue_len % 1000) == 0 && queue_len > 1) {
-        crm_err("CPG queue has grown to %d", queue_len);
-
-    } else if (queue_len == CS_SEND_MAX) {
-        crm_warn("CPG queue has grown to %d", queue_len);
-    }
-
-    if (cs_message_timer) {
-        /* There is already a timer, wait until it goes off */
-        crm_trace("Timer active %d", cs_message_timer);
-        return pcmk_ok;
-    }
-
-    while (cs_message_queue && sent < CS_SEND_MAX) {
-        AIS_Message *header = NULL;
-        struct iovec *iov = cs_message_queue->data;
-
-        errno = 0;
-        rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, iov, 1);
-
-        if (rc != CS_OK) {
-            break;
-        }
-
-        sent++;
-        header = iov->iov_base;
-        last_sent = header->id;
-        if (header->compressed_size) {
-            crm_trace("CPG message %d (%d compressed bytes) sent",
-                      header->id, header->compressed_size);
-        } else {
-            crm_trace("CPG message %d (%d bytes) sent: %.200s",
-                      header->id, header->size, header->data);
-        }
-
-        cs_message_queue = g_list_remove(cs_message_queue, iov);
-        free(iov[0].iov_base);
-        free(iov);
-    }
-
-    queue_len -= sent;
-    if (sent > 1 || cs_message_queue) {
-        crm_info("Sent %d CPG messages  (%d remaining, last=%u): %s (%d)",
-                 sent, queue_len, last_sent, ais_error2text(rc), rc);
-    } else {
-        crm_trace("Sent %d CPG messages  (%d remaining, last=%u): %s (%d)",
-                  sent, queue_len, last_sent, ais_error2text(rc), rc);
-    }
-
-    if (cs_message_queue) {
-        uint32_t delay_ms = 100;
-        if(rc != CS_OK) {
-            /* Proportionally more if sending failed but cap at 1s */
-            delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
-        }
-        cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, NULL);
-    }
-
-    return rc;
-}
-
-gboolean
-send_ais_text(int class, const char *data,
-              gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
-    static int msg_id = 0;
-    static int local_pid = 0;
-    static int local_name_len = 0;
-    static const char *local_name = NULL;
-
-    char *target = NULL;
-    struct iovec *iov;
-    AIS_Message *ais_msg = NULL;
-    enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
-
-    /* There are only 6 handlers registered to crm_lib_service in plugin.c */
-    CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
-              return FALSE);
-
-    CRM_CHECK(dest != crm_msg_ais, return FALSE);
-
-    if(local_name == NULL) {
-        local_name = get_local_node_name();
-    }
-    if(local_name_len == 0 && local_name) {
-        local_name_len = strlen(local_name);
-    }
-
-    if (data == NULL) {
-        data = "";
-    }
-
-    if (local_pid == 0) {
-        local_pid = getpid();
-    }
-
-    if (sender == crm_msg_none) {
-        sender = local_pid;
-    }
-
-    ais_msg = calloc(1, sizeof(AIS_Message));
-
-    ais_msg->id = msg_id++;
-    ais_msg->header.id = class;
-    ais_msg->header.error = CS_OK;
-
-    ais_msg->host.type = dest;
-    ais_msg->host.local = local;
-
-    if (node) {
-        if (node->uname) {
-            target = strdup(node->uname);
-            ais_msg->host.size = strlen(node->uname);
-            memset(ais_msg->host.uname, 0, MAX_NAME);
-            memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size);
-        } else {
-            target = g_strdup_printf("%u", node->id);
-        }
-        ais_msg->host.id = node->id;
-    } else {
-        target = strdup("all");
-    }
-
-    ais_msg->sender.id = 0;
-    ais_msg->sender.type = sender;
-    ais_msg->sender.pid = local_pid;
-    ais_msg->sender.size = local_name_len;
-    memset(ais_msg->sender.uname, 0, MAX_NAME);
-    memcpy(ais_msg->sender.uname, local_name, ais_msg->sender.size);
-
-    ais_msg->size = 1 + strlen(data);
-    ais_msg->header.size = sizeof(AIS_Message) + ais_msg->size;
-
-    if (ais_msg->size < CRM_BZ2_THRESHOLD) {
-        ais_msg = realloc(ais_msg, ais_msg->header.size);
-        memcpy(ais_msg->data, data, ais_msg->size);
-
-    } else {
-        char *compressed = NULL;
-        unsigned int new_size = 0;
-        char *uncompressed = strdup(data);
-
-        if (crm_compress_string(uncompressed, ais_msg->size, 0, &compressed, &new_size)) {
-
-            ais_msg->header.size = sizeof(AIS_Message) + new_size + 1;
-            ais_msg = realloc(ais_msg, ais_msg->header.size);
-            memcpy(ais_msg->data, compressed, new_size);
-            ais_msg->data[new_size] = 0;
-
-            ais_msg->is_compressed = TRUE;
-            ais_msg->compressed_size = new_size;
-
-        } else {
-            ais_msg = realloc(ais_msg, ais_msg->header.size);
-            memcpy(ais_msg->data, data, ais_msg->size);
-        }
-
-        free(uncompressed);
-        free(compressed);
-    }
-
-    if (ais_msg->compressed_size) {
-        crm_trace("Queueing CPG message %u to %s (%d compressed bytes)",
-                  ais_msg->id, target, ais_msg->compressed_size);
-    } else {
-        crm_trace("Queueing CPG message %u to %s (%d bytes)",
-                  ais_msg->id, target, ais_msg->size);
-    }
-
-    iov = calloc(1, sizeof(struct iovec));
-    iov->iov_base = ais_msg;
-    iov->iov_len = ais_msg->header.size;
-    cs_message_queue = g_list_append(cs_message_queue, iov);
-    crm_cs_flush();
-
-    free(target);
-    return TRUE;
-}
-
-gboolean
-send_ais_message(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
-    gboolean rc = TRUE;
-    char *data = dump_xml_unformatted(msg);
-
-    rc = send_ais_text(crm_class_cluster, data, local, node, dest);
-    free(data);
-    return rc;
-}
-
 void
-terminate_cs_connection(void)
+terminate_cs_connection(crm_cluster_t *cluster)
 {
     crm_notice("Disconnecting from Corosync");
 
-    if (pcmk_cpg_handle) {
-        crm_trace("Disconnecting CPG");
-        cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group);
-        cpg_finalize(pcmk_cpg_handle);
-        pcmk_cpg_handle = 0;
-
-    } else {
-        crm_info("No CPG connection");
-    }
+    cluster_disconnect_cpg(cluster);
 
     if (pcmk_quorum_handle) {
         crm_trace("Disconnecting quorum");
@@ -478,284 +156,6 @@ terminate_cs_connection(void)
 int ais_membership_timer = 0;
 gboolean ais_membership_force = FALSE;
 
-static gboolean
-ais_dispatch_message(AIS_Message * msg,
-                     gboolean(*dispatch) (int kind, const char *from, const char *data))
-{
-    char *data = NULL;
-    char *uncompressed = NULL;
-
-    xmlNode *xml = NULL;
-
-    CRM_ASSERT(msg != NULL);
-
-    crm_trace("Got new%s message (size=%d, %d, %d)",
-              msg->is_compressed ? " compressed" : "",
-              ais_data_len(msg), msg->size, msg->compressed_size);
-
-    data = msg->data;
-    if (msg->is_compressed && msg->size > 0) {
-        int rc = BZ_OK;
-        unsigned int new_size = msg->size + 1;
-
-        if (check_message_sanity(msg, NULL) == FALSE) {
-            goto badmsg;
-        }
-
-        crm_trace("Decompressing message data");
-        uncompressed = calloc(1, new_size);
-        rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0);
-
-        if (rc != BZ_OK) {
-            crm_err("Decompression failed: %d", rc);
-            goto badmsg;
-        }
-
-        CRM_ASSERT(rc == BZ_OK);
-        CRM_ASSERT(new_size == msg->size);
-
-        data = uncompressed;
-
-    } else if (check_message_sanity(msg, data) == FALSE) {
-        goto badmsg;
-
-    } else if (safe_str_eq("identify", data)) {
-        int pid = getpid();
-        char *pid_s = crm_itoa(pid);
-
-        send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
-        free(pid_s);
-        goto done;
-    }
-
-    if (msg->header.id != crm_class_members) {
-        /* Is this even needed anymore? */
-        crm_get_peer(msg->sender.id, msg->sender.uname);
-    }
-
-    if (msg->header.id == crm_class_rmpeer) {
-        uint32_t id = crm_int_helper(data, NULL);
-
-        crm_info("Removing peer %s/%u", data, id);
-        reap_crm_member(id, NULL);
-        goto done;
-    }
-
-    crm_trace("Payload: %.200s", data);
-    if (dispatch != NULL) {
-        dispatch(msg->header.id, msg->sender.uname, data);
-    }
-
-  done:
-    free(uncompressed);
-    free_xml(xml);
-    return TRUE;
-
-  badmsg:
-    crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
-            " min=%d, total=%d, size=%d, bz2_size=%d",
-            msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
-            ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
-            msg->sender.pid, (int)sizeof(AIS_Message),
-            msg->header.size, msg->size, msg->compressed_size);
-    goto done;
-}
-
-static bool cpg_evicted = FALSE;
-gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
-
-static int
-pcmk_cpg_dispatch(gpointer user_data)
-{
-    int rc = 0;
-
-    pcmk_cpg_dispatch_fn = user_data;
-    rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL);
-    if (rc != CS_OK) {
-        crm_err("Connection to the CPG API failed: %d", rc);
-        pcmk_cpg_handle = 0;
-        return -1;
-
-    } else if(cpg_evicted) {
-        crm_err("Evicted from CPG membership");
-        return -1;
-    }
-    return 0;
-}
-
-static void
-pcmk_cpg_deliver(cpg_handle_t handle,
-                 const struct cpg_name *groupName,
-                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
-{
-    AIS_Message *ais_msg = (AIS_Message *) msg;
-    uint32_t local_nodeid = get_local_nodeid(handle);
-    const char *local_name = get_local_node_name();
-
-    if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) {
-        crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id);
-        return;
-
-    } else if (ais_msg->host.id != 0 && (local_nodeid != ais_msg->host.id)) {
-        /* Not for us */
-        crm_trace("Not for us: %u != %u", ais_msg->host.id, local_nodeid);
-        return;
-    } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, local_name)) {
-        /* Not for us */
-        crm_trace("Not for us: %s != %s", ais_msg->host.uname, local_name);
-        return;
-    }
-
-    ais_msg->sender.id = nodeid;
-    if (ais_msg->sender.size == 0) {
-        crm_node_t *peer = crm_get_peer(nodeid, NULL);
-
-        if (peer == NULL) {
-            crm_err("Peer with nodeid=%u is unknown", nodeid);
-
-        } else if (peer->uname == NULL) {
-            crm_err("No uname for peer with nodeid=%u", nodeid);
-
-        } else {
-            crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
-            ais_msg->sender.size = strlen(peer->uname);
-            memset(ais_msg->sender.uname, 0, MAX_NAME);
-            memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size);
-        }
-    }
-
-    ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn);
-}
-
-static void
-pcmk_cpg_membership(cpg_handle_t handle,
-                    const struct cpg_name *groupName,
-                    const struct cpg_address *member_list, size_t member_list_entries,
-                    const struct cpg_address *left_list, size_t left_list_entries,
-                    const struct cpg_address *joined_list, size_t joined_list_entries)
-{
-    int i;
-    gboolean found = FALSE;
-    static int counter = 0;
-    uint32_t local_nodeid = get_local_nodeid(handle);
-
-    for (i = 0; i < left_list_entries; i++) {
-        crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
-
-        crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid);
-        crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
-    }
-
-    for (i = 0; i < joined_list_entries; i++) {
-        crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid);
-    }
-
-    for (i = 0; i < member_list_entries; i++) {
-        crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
-
-        crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid);
-
-        /* Anyone that is sending us CPG messages must also be a _CPG_ member.
-         * But its _not_ safe to assume its in the quorum membership.
-         * We may have just found out its dead and are processing the last couple of messages it sent
-         */
-        crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
-        if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
-            time_t now = time(NULL);
-
-            /* Co-opt the otherwise unused votes field */
-            if(peer->votes == 0) {
-                peer->votes = now;
-
-            } else if(now > (60 + peer->votes)) {
-                /* On the otherhand, if we're still getting messages, at a certain point
-                 * we need to acknowledge our internal cache is probably wrong
-                 *
-                 * Set the threshold to 1 minute
-                 */
-                crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
-                crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
-                peer->votes = 0;
-            }
-        }
-
-        if (local_nodeid == member_list[i].nodeid) {
-            found = TRUE;
-        }
-    }
-
-    if (!found) {
-        crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
-        cpg_evicted = TRUE;
-    }
-
-    counter++;
-}
-
-cpg_callbacks_t cpg_callbacks = {
-    .cpg_deliver_fn = pcmk_cpg_deliver,
-    .cpg_confchg_fn = pcmk_cpg_membership,
-};
-
-static gboolean
-init_cpg_connection(gboolean(*dispatch) (int kind, const char *from, const char *data),
-                    void (*destroy) (gpointer), uint32_t * nodeid)
-{
-    int rc = -1;
-    int fd = 0;
-    int retries = 0;
-    uint32_t id = 0;
-    crm_node_t *peer = NULL;
-
-    struct mainloop_fd_callbacks cpg_fd_callbacks = {
-        .dispatch = pcmk_cpg_dispatch,
-        .destroy = destroy,
-    };
-
-    cpg_evicted = FALSE;
-    strncpy(pcmk_cpg_group.value, crm_system_name, 128);
-    pcmk_cpg_group.length = strlen(crm_system_name) + 1;
-
-    cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks));
-    if (rc != CS_OK) {
-        crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
-        goto bail;
-    }
-
-    id = get_local_nodeid(pcmk_cpg_handle);
-    if (id == 0) {
-        crm_err("Could not get local node id from the CPG API");
-        goto bail;
-
-    } else if(nodeid) {
-        *nodeid = id;
-    }
-
-    retries = 0;
-    cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group));
-    if (rc != CS_OK) {
-        crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
-        goto bail;
-    }
-
-    rc = cpg_fd_get(pcmk_cpg_handle, &fd);
-    if (rc != CS_OK) {
-        crm_err("Could not obtain the CPG API connection: %d\n", rc);
-        goto bail;
-    }
-
-    mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, dispatch, &cpg_fd_callbacks);
-
-  bail:
-    if (rc != CS_OK) {
-        cpg_finalize(pcmk_cpg_handle);
-        return FALSE;
-    }
-
-    peer = crm_get_peer(id, NULL);
-    crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
-    return TRUE;
-}
 
 static int
 pcmk_quorum_dispatch(gpointer user_data)
@@ -940,7 +340,7 @@ init_cs_connection_once(crm_cluster_t * cluster)
         return FALSE;
     }
 
-    if (init_cpg_connection(cluster->cs_dispatch, cluster->destroy, NULL) == FALSE) {
+    if (cluster_connect_cpg(cluster) == FALSE) {
         return FALSE;
     }
     crm_info("Connection to '%s': established", name_for_cluster_type(stack));
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
new file mode 100644
index 0000000..903576e
--- /dev/null
+++ b/lib/cluster/cpg.c
@@ -0,0 +1,689 @@
+/*
+ * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include <crm_internal.h>
+#include <bzlib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include <crm/common/ipc.h>
+#include <crm/cluster/internal.h>
+#include <crm/common/mainloop.h>
+#include <sys/utsname.h>
+
+#include <qb/qbipcc.h>
+#include <qb/qbutil.h>
+
+#include <corosync/corodefs.h>
+#include <corosync/corotypes.h>
+#include <corosync/hdb.h>
+#include <corosync/cpg.h>
+
+#include <crm/msg_xml.h>
+
+cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
+
+static bool cpg_evicted = FALSE;
+gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
+
+#define cs_repeat(counter, max, code) do {		\
+	code;						\
+	if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
+	    counter++;					\
+	    crm_debug("Retrying operation after %ds", counter);	\
+	    sleep(counter);				\
+	} else {                                        \
+            break;                                      \
+        }                                               \
+    } while(counter < max)
+
+void
+cluster_disconnect_cpg(crm_cluster_t *cluster)
+{
+    pcmk_cpg_handle = 0;
+    if (cluster->cpg_handle) {
+        crm_trace("Disconnecting CPG");
+        cpg_leave(cluster->cpg_handle, &cluster->group);
+        cpg_finalize(cluster->cpg_handle);
+        cluster->cpg_handle = 0;
+
+    } else {
+        crm_info("No CPG connection");
+    }
+}
+
+uint32_t get_local_nodeid(cpg_handle_t handle)
+{
+    int rc = CS_OK;
+    int retries = 0;
+    static uint32_t local_nodeid = 0;
+    cpg_handle_t local_handle = handle;
+    cpg_callbacks_t cb = { };
+
+    if(local_nodeid != 0) {
+        return local_nodeid;
+    }
+
+#if 0
+    /* Should not be necessary */
+    if(get_cluster_type() == pcmk_cluster_classic_ais) {
+        get_ais_details(&local_nodeid, NULL);
+        goto done;
+    }
+#endif
+
+    if(handle == 0) {
+        crm_trace("Creating connection");
+        cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
+    }
+
+    if (rc == CS_OK) {
+        retries = 0;
+        crm_trace("Performing lookup");
+        cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
+    }
+
+    if (rc != CS_OK) {
+        crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
+    }
+    if(handle == 0) {
+        crm_trace("Closing connection");
+        cpg_finalize(local_handle);
+    }
+    crm_debug("Local nodeid is %u", local_nodeid);
+    return local_nodeid;
+}
+
+
+GListPtr cs_message_queue = NULL;
+int cs_message_timer = 0;
+
+static ssize_t crm_cs_flush(gpointer data);
+
+static gboolean
+crm_cs_flush_cb(gpointer data)
+{
+    cs_message_timer = 0;
+    crm_cs_flush(data);
+    return FALSE;
+}
+
+#define CS_SEND_MAX 200
+static ssize_t
+crm_cs_flush(gpointer data)
+{
+    int sent = 0;
+    ssize_t rc = 0;
+    int queue_len = 0;
+    static unsigned int last_sent = 0;
+    cpg_handle_t *handle = (cpg_handle_t *)data;
+
+    if (*handle == 0) {
+        crm_trace("Connection is dead");
+        return pcmk_ok;
+    }
+
+    queue_len = g_list_length(cs_message_queue);
+    if ((queue_len % 1000) == 0 && queue_len > 1) {
+        crm_err("CPG queue has grown to %d", queue_len);
+
+    } else if (queue_len == CS_SEND_MAX) {
+        crm_warn("CPG queue has grown to %d", queue_len);
+    }
+
+    if (cs_message_timer) {
+        /* There is already a timer, wait until it goes off */
+        crm_trace("Timer active %d", cs_message_timer);
+        return pcmk_ok;
+    }
+
+    while (cs_message_queue && sent < CS_SEND_MAX) {
+        struct iovec *iov = cs_message_queue->data;
+
+        errno = 0;
+        rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
+
+        if (rc != CS_OK) {
+            break;
+        }
+
+        sent++;
+        last_sent++;
+        crm_trace("CPG message sent, size=%d", iov->iov_len);
+
+        cs_message_queue = g_list_remove(cs_message_queue, iov);
+        free(iov[0].iov_base);
+        free(iov);
+    }
+
+    queue_len -= sent;
+    if (sent > 1 || cs_message_queue) {
+        crm_info("Sent %d CPG messages  (%d remaining, last=%u): %s (%d)",
+                 sent, queue_len, last_sent, ais_error2text(rc), rc);
+    } else {
+        crm_trace("Sent %d CPG messages  (%d remaining, last=%u): %s (%d)",
+                  sent, queue_len, last_sent, ais_error2text(rc), rc);
+    }
+
+    if (cs_message_queue) {
+        uint32_t delay_ms = 100;
+        if(rc != CS_OK) {
+            /* Proportionally more if sending failed but cap at 1s */
+            delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
+        }
+        cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
+    }
+
+    return rc;
+}
+
+gboolean
+send_cpg_iov(struct iovec * iov)
+{
+    static unsigned int queued = 0;
+
+    queued++;
+    crm_trace("Queueing CPG message %u (%d bytes)", queued, iov->iov_len);
+    cs_message_queue = g_list_append(cs_message_queue, iov);
+    crm_cs_flush(&pcmk_cpg_handle);
+    return TRUE;
+}
+
+static int
+pcmk_cpg_dispatch(gpointer user_data)
+{
+    int rc = 0;
+    crm_cluster_t *cluster = (crm_cluster_t*) user_data;
+
+    rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ALL);
+    if (rc != CS_OK) {
+        crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
+        cluster->cpg_handle = 0;
+        return -1;
+
+    } else if(cpg_evicted) {
+        crm_err("Evicted from CPG membership");
+        return -1;
+    }
+    return 0;
+}
+
+/*
+static void
+pcmk_cpg_deliver_message(cpg_handle_t handle,
+                         const struct cpg_name *groupName,
+                         uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+    uint32_t kind = 0;
+    const char *from = NULL;
+    char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+
+    free(data);
+}
+*/
+
+char *
+pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
+                        uint32_t *kind, const char **from)
+{
+    char *data = NULL;
+    AIS_Message *msg = (AIS_Message *) content;
+
+    if(handle) {
+        /* 'msg' came from CPG not the plugin
+         * Do filtering and field massaging
+         */
+        uint32_t local_nodeid = get_local_nodeid(handle);
+        const char *local_name = get_local_node_name();
+
+        if (msg->sender.id > 0 && msg->sender.id != nodeid) {
+            crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
+            return NULL;
+
+        } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
+            /* Not for us */
+            crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
+            return NULL;
+        } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
+            /* Not for us */
+            crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
+            return NULL;
+        }
+
+        msg->sender.id = nodeid;
+        if (msg->sender.size == 0) {
+            crm_node_t *peer = crm_get_peer(nodeid, NULL);
+
+            if (peer == NULL) {
+                crm_err("Peer with nodeid=%u is unknown", nodeid);
+
+            } else if (peer->uname == NULL) {
+                crm_err("No uname for peer with nodeid=%u", nodeid);
+
+            } else {
+                crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
+                msg->sender.size = strlen(peer->uname);
+                memset(msg->sender.uname, 0, MAX_NAME);
+                memcpy(msg->sender.uname, peer->uname, msg->sender.size);
+            }
+        }
+    }
+
+    crm_trace("Got new%s message (size=%d, %d, %d)",
+              msg->is_compressed ? " compressed" : "",
+              ais_data_len(msg), msg->size, msg->compressed_size);
+
+    if (kind != NULL) {
+        *kind = msg->header.id;
+    }
+    if (from != NULL) {
+        *from = msg->sender.uname;
+    }
+
+    if (msg->is_compressed && msg->size > 0) {
+        int rc = BZ_OK;
+        char *uncompressed = NULL;
+        unsigned int new_size = msg->size + 1;
+
+        if (check_message_sanity(msg, NULL) == FALSE) {
+            goto badmsg;
+        }
+
+        crm_trace("Decompressing message data");
+        uncompressed = calloc(1, new_size);
+        rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
+
+        if (rc != BZ_OK) {
+            crm_err("Decompression failed: %d", rc);
+            goto badmsg;
+        }
+
+        CRM_ASSERT(rc == BZ_OK);
+        CRM_ASSERT(new_size == msg->size);
+
+        data = uncompressed;
+
+    } else if (check_message_sanity(msg, data) == FALSE) {
+        goto badmsg;
+
+    } else if (safe_str_eq("identify", data)) {
+        int pid = getpid();
+        char *pid_s = crm_itoa(pid);
+
+        send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
+        free(pid_s);
+        return NULL;
+
+    } else {
+        data = strdup(msg->data);
+    }
+
+    if (msg->header.id != crm_class_members) {
+        /* Is this even needed anymore? */
+        crm_get_peer(msg->sender.id, msg->sender.uname);
+    }
+
+    if (msg->header.id == crm_class_rmpeer) {
+        uint32_t id = crm_int_helper(data, NULL);
+
+        crm_info("Removing peer %s/%u", data, id);
+        reap_crm_member(id, NULL);
+        free(data);
+        return NULL;
+
+#if SUPPORT_PLUGIN
+    } else if (is_classic_ais_cluster()) {
+        plugin_handle_membership(msg);
+#endif
+    }
+
+    crm_trace("Payload: %.200s", data);
+    return data;
+
+  badmsg:
+    crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
+            " min=%d, total=%d, size=%d, bz2_size=%d",
+            msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
+            ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
+            msg->sender.pid, (int)sizeof(AIS_Message),
+            msg->header.size, msg->size, msg->compressed_size);
+
+    free(data);
+    return NULL;
+}
+
+void
+pcmk_cpg_membership(cpg_handle_t handle,
+                    const struct cpg_name *groupName,
+                    const struct cpg_address *member_list, size_t member_list_entries,
+                    const struct cpg_address *left_list, size_t left_list_entries,
+                    const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+    int i;
+    gboolean found = FALSE;
+    static int counter = 0;
+    uint32_t local_nodeid = get_local_nodeid(handle);
+
+    for (i = 0; i < left_list_entries; i++) {
+        crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
+
+        crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid);
+        crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
+    }
+
+    for (i = 0; i < joined_list_entries; i++) {
+        crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid);
+    }
+
+    for (i = 0; i < member_list_entries; i++) {
+        crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
+
+        crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid);
+
+        /* Anyone that is sending us CPG messages must also be a _CPG_ member.
+         * But its _not_ safe to assume its in the quorum membership.
+         * We may have just found out its dead and are processing the last couple of messages it sent
+         */
+        crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
+        if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
+            time_t now = time(NULL);
+
+            /* Co-opt the otherwise unused votes field */
+            if(peer->votes == 0) {
+                peer->votes = now;
+
+            } else if(now > (60 + peer->votes)) {
+                /* On the otherhand, if we're still getting messages, at a certain point
+                 * we need to acknowledge our internal cache is probably wrong
+                 *
+                 * Set the threshold to 1 minute
+                 */
+                crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
+                crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
+                peer->votes = 0;
+            }
+        }
+
+        if (local_nodeid == member_list[i].nodeid) {
+            found = TRUE;
+        }
+    }
+
+    if (!found) {
+        crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
+        cpg_evicted = TRUE;
+    }
+
+    counter++;
+}
+
+gboolean
+cluster_connect_cpg(crm_cluster_t *cluster)
+{
+    int rc = -1;
+    int fd = 0;
+    int retries = 0;
+    uint32_t id = 0;
+    crm_node_t *peer = NULL;
+    cpg_handle_t handle = 0;
+
+    struct mainloop_fd_callbacks cpg_fd_callbacks = {
+        .dispatch = pcmk_cpg_dispatch,
+        .destroy = cluster->destroy,
+    };
+
+    cpg_callbacks_t cpg_callbacks = {
+        .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
+        .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
+        /* .cpg_deliver_fn = pcmk_cpg_deliver, */
+        /* .cpg_confchg_fn = pcmk_cpg_membership, */
+    };
+
+    cpg_evicted = FALSE;
+    cluster->group.length = 0;
+    cluster->group.value[0] = 0;
+
+    strncpy(cluster->group.value, crm_system_name, 128);
+    cluster->group.length = strlen(crm_system_name) + 1;
+
+    cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
+    if (rc != CS_OK) {
+        crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
+        goto bail;
+    }
+
+    id = get_local_nodeid(handle);
+    if (id == 0) {
+        crm_err("Could not get local node id from the CPG API");
+        goto bail;
+
+    }
+    cluster->nodeid = id;
+
+    retries = 0;
+    cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
+    if (rc != CS_OK) {
+        crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
+        goto bail;
+    }
+
+    rc = cpg_fd_get(handle, &fd);
+    if (rc != CS_OK) {
+        crm_err("Could not obtain the CPG API connection: %d\n", rc);
+        goto bail;
+    }
+
+    pcmk_cpg_handle = handle;
+    cluster->cpg_handle = handle;
+    mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
+
+  bail:
+    if (rc != CS_OK) {
+        cpg_finalize(handle);
+        return FALSE;
+    }
+
+    peer = crm_get_peer(id, NULL);
+    crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
+    return TRUE;
+}
+
+gboolean
+send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
+{
+    gboolean rc = TRUE;
+    char *data = NULL;
+
+    data = dump_xml_unformatted(msg);
+    rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
+    free(data);
+    return rc;
+}
+
+gboolean
+send_cluster_text(int class, const char *data,
+              gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
+{
+    static int msg_id = 0;
+    static int local_pid = 0;
+    static int local_name_len = 0;
+    static const char *local_name = NULL;
+
+    char *target = NULL;
+    struct iovec *iov;
+    AIS_Message *msg = NULL;
+    enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
+
+    /* There are only 6 handlers registered to crm_lib_service in plugin.c */
+    CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
+              return FALSE);
+
+#if !SUPPORT_PLUGIN
+    CRM_CHECK(dest != crm_msg_ais, return FALSE);
+#endif
+
+    if(local_name == NULL) {
+        local_name = get_local_node_name();
+    }
+    if(local_name_len == 0 && local_name) {
+        local_name_len = strlen(local_name);
+    }
+
+    if (data == NULL) {
+        data = "";
+    }
+
+    if (local_pid == 0) {
+        local_pid = getpid();
+    }
+
+    if (sender == crm_msg_none) {
+        sender = local_pid;
+    }
+
+    msg = calloc(1, sizeof(AIS_Message));
+
+    msg_id++;
+    msg->id = msg_id;
+    msg->header.id = class;
+    msg->header.error = CS_OK;
+
+    msg->host.type = dest;
+    msg->host.local = local;
+
+    if (node) {
+        if (node->uname) {
+            target = strdup(node->uname);
+            msg->host.size = strlen(node->uname);
+            memset(msg->host.uname, 0, MAX_NAME);
+            memcpy(msg->host.uname, node->uname, msg->host.size);
+        } else {
+            target = g_strdup_printf("%u", node->id);
+        }
+        msg->host.id = node->id;
+    } else {
+        target = strdup("all");
+    }
+
+    msg->sender.id = 0;
+    msg->sender.type = sender;
+    msg->sender.pid = local_pid;
+    msg->sender.size = local_name_len;
+    memset(msg->sender.uname, 0, MAX_NAME);
+    memcpy(msg->sender.uname, local_name, msg->sender.size);
+
+    msg->size = 1 + strlen(data);
+    msg->header.size = sizeof(AIS_Message) + msg->size;
+
+    if (msg->size < CRM_BZ2_THRESHOLD) {
+        msg = realloc(msg, msg->header.size);
+        memcpy(msg->data, data, msg->size);
+
+    } else {
+        char *compressed = NULL;
+        unsigned int new_size = 0;
+        char *uncompressed = strdup(data);
+
+        if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
+
+            msg->header.size = sizeof(AIS_Message) + new_size + 1;
+            msg = realloc(msg, msg->header.size);
+            memcpy(msg->data, compressed, new_size);
+            msg->data[new_size] = 0;
+
+            msg->is_compressed = TRUE;
+            msg->compressed_size = new_size;
+
+        } else {
+            msg = realloc(msg, msg->header.size);
+            memcpy(msg->data, data, msg->size);
+        }
+
+        free(uncompressed);
+        free(compressed);
+    }
+
+    iov = calloc(1, sizeof(struct iovec));
+    iov->iov_base = msg;
+    iov->iov_len = msg->header.size;
+
+    if (msg->compressed_size) {
+        crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes compressed payload): %.200s",
+                  msg->id, target, iov->iov_len, msg->compressed_size, data);
+    } else {
+        crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes payload): %.200s",
+                  msg->id, target, iov->iov_len, msg->size, data);
+    }
+
+#if SUPPORT_PLUGIN
+    /* The plugin is the only time we dont use CPG messaging */
+    if(get_cluster_type() == pcmk_cluster_classic_ais) {
+        return send_plugin_text(class, iov);
+    }
+#endif
+
+    send_cpg_iov(iov);
+
+    free(target);
+    return TRUE;
+}
+
+enum crm_ais_msg_types
+text2msg_type(const char *text)
+{
+    int type = crm_msg_none;
+
+    CRM_CHECK(text != NULL, return type);
+    if (safe_str_eq(text, "ais")) {
+        type = crm_msg_ais;
+    } else if (safe_str_eq(text, "crm_plugin")) {
+        type = crm_msg_ais;
+    } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
+        type = crm_msg_cib;
+    } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
+        type = crm_msg_crmd;
+    } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
+        type = crm_msg_crmd;
+    } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
+        type = crm_msg_te;
+    } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
+        type = crm_msg_pe;
+    } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
+        type = crm_msg_lrmd;
+    } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
+        type = crm_msg_stonithd;
+    } else if (safe_str_eq(text, "stonith-ng")) {
+        type = crm_msg_stonith_ng;
+    } else if (safe_str_eq(text, "attrd")) {
+        type = crm_msg_attrd;
+
+    } else {
+        /* This will normally be a transient client rather than
+         * a cluster daemon.  Set the type to the pid of the client
+         */
+        int scan_rc = sscanf(text, "%d", &type);
+
+        if (scan_rc != 1) {
+            /* Ensure its sane */
+            type = crm_msg_none;
+        }
+    }
+    return type;
+}
diff --git a/lib/cluster/legacy.c b/lib/cluster/legacy.c
index 14749e4..8b16f7e 100644
--- a/lib/cluster/legacy.c
+++ b/lib/cluster/legacy.c
@@ -31,12 +31,6 @@
 #  include <corosync/corodefs.h>
 #  include <corosync/cpg.h>
 #  include <corosync/cfg.h>
-cpg_handle_t pcmk_cpg_handle = 0;
-
-struct cpg_name pcmk_cpg_group = {
-    .length = 0,
-    .value[0] = 0,
-};
 #endif
 
 #if HAVE_CMAP
@@ -50,88 +44,8 @@ cman_handle_t pcmk_cman_handle = NULL;
 
 int ais_membership_timer = 0;
 gboolean ais_membership_force = FALSE;
-int ais_dispatch(gpointer user_data);
-
-#define cs_repeat(counter, max, code) do {		\
-	code;						\
-	if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {  \
-	    counter++;					\
-	    crm_debug("Retrying operation after %ds", counter);	\
-	    sleep(counter);				\
-	} else {                                        \
-            break;                                      \
-        }                                               \
-    } while(counter < max)
-
-enum crm_ais_msg_types
-text2msg_type(const char *text)
-{
-    int type = crm_msg_none;
-
-    CRM_CHECK(text != NULL, return type);
-    if (safe_str_eq(text, "ais")) {
-        type = crm_msg_ais;
-    } else if (safe_str_eq(text, "crm_plugin")) {
-        type = crm_msg_ais;
-    } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
-        type = crm_msg_cib;
-    } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
-        type = crm_msg_crmd;
-    } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
-        type = crm_msg_crmd;
-    } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
-        type = crm_msg_te;
-    } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
-        type = crm_msg_pe;
-    } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
-        type = crm_msg_lrmd;
-    } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
-        type = crm_msg_stonithd;
-    } else if (safe_str_eq(text, "stonith-ng")) {
-        type = crm_msg_stonith_ng;
-    } else if (safe_str_eq(text, "attrd")) {
-        type = crm_msg_attrd;
-
-    } else {
-        /* This will normally be a transient client rather than
-         * a cluster daemon.  Set the type to the pid of the client
-         */
-        int scan_rc = sscanf(text, "%d", &type);
-
-        if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
-            /* Ensure its sane */
-            type = crm_msg_none;
-        }
-    }
-    return type;
-}
+int plugin_dispatch(gpointer user_data);
 
-char *
-get_ais_data(const AIS_Message * msg)
-{
-    int rc = BZ_OK;
-    char *uncompressed = NULL;
-    unsigned int new_size = msg->size + 1;
-
-    if (msg->is_compressed == FALSE) {
-        crm_trace("Returning uncompressed message data");
-        uncompressed = strdup(msg->data);
-
-    } else {
-        crm_trace("Decompressing message data");
-        uncompressed = calloc(1, new_size);
-
-        rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data,
-                                        msg->compressed_size, 1, 0);
-
-        CRM_ASSERT(rc == BZ_OK);
-        CRM_ASSERT(new_size == msg->size);
-    }
-
-    return uncompressed;
-}
-
-#if SUPPORT_COROSYNC
 int ais_fd_sync = -1;
 int ais_fd_async = -1;          /* never send messages via this channel */
 void *ais_ipc_ctx = NULL;
@@ -160,9 +74,6 @@ get_ais_details(uint32_t * id, char **uname)
     header.id = crm_class_nodeid;
     header.size = sizeof(cs_ipc_header_response_t);
 
-    CRM_CHECK(id != NULL, return FALSE);
-    CRM_CHECK(uname != NULL, return FALSE);
-
     iov.iov_base = &header;
     iov.iov_len = header.size;
 
@@ -203,140 +114,7 @@ get_ais_details(uint32_t * id, char **uname)
     return TRUE;
 }
 
-static uint32_t get_local_nodeid(cpg_handle_t handle)
-{
-    int rc = CS_OK;
-    int retries = 0;
-    static uint32_t local_nodeid = 0;
-    cpg_handle_t local_handle = handle;
-    cpg_callbacks_t cb = { };
-
-    if(local_nodeid != 0) {
-        return local_nodeid;
-    }
-
-#if 0
-    /* Should not be necessary */
-    if(get_cluster_type() == pcmk_cluster_classic_ais) {
-        get_ais_details(&local_nodeid, NULL);
-        goto done;
-    }
-#endif
-
-    if(local_handle == 0) {
-        crm_trace("Creating connection");
-        cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
-    }
-
-    if (rc == CS_OK) {
-        retries = 0;
-        crm_trace("Performing lookup");
-        cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
-    }
-
-    if (rc != CS_OK) {
-        crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
-    }
-
-    if(handle != local_handle) {
-        crm_trace("Closing connection %u", local_handle);
-        cpg_finalize(local_handle);
-    }
-
-    crm_debug("Local nodeid is %u", local_nodeid);
-    return local_nodeid;
-}
-
-GListPtr cs_message_queue = NULL;
-int cs_message_timer = 0;
-
-static ssize_t crm_cs_flush(void);
-
-static gboolean
-crm_cs_flush_cb(gpointer data)
-{
-    cs_message_timer = 0;
-    crm_cs_flush();
-    return FALSE;
-}
-
-#define CS_SEND_MAX 200
-static ssize_t
-crm_cs_flush(void)
-{
-    int sent = 0;
-    ssize_t rc = 0;
-    int queue_len = 0;
-    static unsigned int last_sent = 0;
-
-    if (pcmk_cpg_handle == 0) {
-        crm_trace("Connection is dead");
-        return pcmk_ok;
-    }
-
-    queue_len = g_list_length(cs_message_queue);
-    if ((queue_len % 1000) == 0 && queue_len > 1) {
-        crm_err("CPG queue has grown to %d", queue_len);
-
-    } else if (queue_len == CS_SEND_MAX) {
-        crm_warn("CPG queue has grown to %d", queue_len);
-    }
-
-    if (cs_message_timer) {
-        /* There is already a timer, wait until it goes off */
-        crm_trace("Timer active %d", cs_message_timer);
-        return pcmk_ok;
-    }
-
-    while (cs_message_queue && sent < CS_SEND_MAX) {
-        AIS_Message *header = NULL;
-        struct iovec *iov = cs_message_queue->data;
-
-        errno = 0;
-        rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, iov, 1);
-
-        if (rc != CS_OK) {
-            break;
-        }
-
-        sent++;
-        header = iov->iov_base;
-        last_sent = header->id;
-        if (header->compressed_size) {
-            crm_trace("CPG message %d (%d compressed bytes) sent",
-                      header->id, header->compressed_size);
-        } else {
-            crm_trace("CPG message %d (%d bytes) sent: %.200s",
-                      header->id, header->size, header->data);
-        }
-
-        cs_message_queue = g_list_remove(cs_message_queue, iov);
-        free(iov[0].iov_base);
-        free(iov);
-    }
-
-    queue_len -= sent;
-    if (sent > 1 || cs_message_queue) {
-        crm_info("Sent %d CPG messages  (%d remaining, last=%u): %s (%d)",
-                 sent, queue_len, last_sent, ais_error2text(rc), rc);
-    } else {
-        crm_trace("Sent %d CPG messages  (%d remaining, last=%u): %s (%d)",
-                  sent, queue_len, last_sent, ais_error2text(rc), rc);
-    }
-
-    if (cs_message_queue) {
-        uint32_t delay_ms = 100;
-        if(rc != CS_OK) {
-            /* Proportionally more if sending failed but cap at 1s */
-            delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
-        }
-        cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, NULL);
-    }
-
-    return rc;
-}
-
-static bool
+bool
 send_plugin_text(int class, struct iovec *iov)
 {
     int rc = CS_OK;
@@ -386,154 +164,8 @@ send_plugin_text(int class, struct iovec *iov)
     return (rc == CS_OK);
 }
 
-gboolean
-send_ais_text(int class, const char *data,
-              gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
-    static int msg_id = 0;
-    static int local_pid = 0;
-    static int local_name_len = 0;
-    static const char *local_name = NULL;
-
-    char *target = NULL;
-    struct iovec *iov;
-    AIS_Message *ais_msg = NULL;
-    enum cluster_type_e cluster_type = get_cluster_type();
-    enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
-
-    /* There are only 6 handlers registered to crm_lib_service in plugin.c */
-    CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
-              return FALSE);
-
-    CRM_CHECK(dest != crm_msg_ais, return FALSE);
-
-    if(local_name == NULL) {
-        local_name = get_local_node_name();
-    }
-    if(local_name_len == 0 && local_name) {
-        local_name_len = strlen(local_name);
-    }
-
-    if (data == NULL) {
-        data = "";
-    }
-
-    if (local_pid == 0) {
-        local_pid = getpid();
-    }
-
-    if (sender == crm_msg_none) {
-        sender = local_pid;
-    }
-
-    ais_msg = calloc(1, sizeof(AIS_Message));
-
-    ais_msg->id = msg_id++;
-    ais_msg->header.id = class;
-    ais_msg->header.error = CS_OK;
-
-    ais_msg->host.type = dest;
-    ais_msg->host.local = local;
-
-    if (node) {
-        if (node->uname) {
-            target = strdup(node->uname);
-            ais_msg->host.size = strlen(node->uname);
-            memset(ais_msg->host.uname, 0, MAX_NAME);
-            memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size);
-        } else {
-            target = g_strdup_printf("%u", node->id);
-        }
-        ais_msg->host.id = node->id;
-    } else {
-        target = strdup("all");
-    }
-
-    ais_msg->sender.id = 0;
-    ais_msg->sender.type = sender;
-    ais_msg->sender.pid = local_pid;
-    ais_msg->sender.size = local_name_len;
-    memset(ais_msg->sender.uname, 0, MAX_NAME);
-    memcpy(ais_msg->sender.uname, local_name, ais_msg->sender.size);
-
-    ais_msg->size = 1 + strlen(data);
-    ais_msg->header.size = sizeof(AIS_Message) + ais_msg->size;
-
-    if (ais_msg->size < CRM_BZ2_THRESHOLD) {
-        ais_msg = realloc(ais_msg, ais_msg->header.size);
-        memcpy(ais_msg->data, data, ais_msg->size);
-
-    } else {
-        char *compressed = NULL;
-        unsigned int new_size = 0;
-        char *uncompressed = strdup(data);
-
-        if (crm_compress_string(uncompressed, ais_msg->size, 0, &compressed, &new_size)) {
-
-            ais_msg->header.size = sizeof(AIS_Message) + new_size + 1;
-            ais_msg = realloc(ais_msg, ais_msg->header.size);
-            memcpy(ais_msg->data, compressed, new_size);
-            ais_msg->data[new_size] = 0;
-
-            ais_msg->is_compressed = TRUE;
-            ais_msg->compressed_size = new_size;
-
-        } else {
-            ais_msg = realloc(ais_msg, ais_msg->header.size);
-            memcpy(ais_msg->data, data, ais_msg->size);
-        }
-
-        free(uncompressed);
-        free(compressed);
-    }
-
-    iov = calloc(1, sizeof(struct iovec));
-    iov->iov_base = ais_msg;
-    iov->iov_len = ais_msg->header.size;
-
-    if (ais_msg->compressed_size) {
-        crm_trace("Queueing %s message %u to %s (%d compressed bytes)",
-                  cluster_type == pcmk_cluster_classic_ais?"plugin":"CPG",
-                  ais_msg->id, target, ais_msg->compressed_size);
-    } else {
-        crm_trace("Queueing %s message %u to %s (%d bytes)",
-                  cluster_type == pcmk_cluster_classic_ais?"plugin":"CPG",
-                  ais_msg->id, target, ais_msg->size);
-    }
-
-    /* The plugin is the only time we dont use CPG messaging */
-    if(cluster_type == pcmk_cluster_classic_ais) {
-        return send_plugin_text(class, iov);
-    }
-
-    cs_message_queue = g_list_append(cs_message_queue, iov);
-    crm_cs_flush();
-
-    free(target);
-    return TRUE;
-}
-
-gboolean
-send_ais_message(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
-    gboolean rc = TRUE;
-    char *data = NULL;
-
-    if (is_classic_ais_cluster()) {
-        if (ais_fd_async < 0) {
-            crm_err("Not connected to AIS: %d", ais_fd_async);
-            return FALSE;
-        }
-    }
-
-    data = dump_xml_unformatted(msg);
-    rc = send_ais_text(crm_class_cluster, data, local, node, dest);
-    free(data);
-    return rc;
-}
-
 void
-terminate_cs_connection(void)
+terminate_cs_connection(crm_cluster_t *cluster)
 {
     crm_notice("Disconnecting from Corosync");
 
@@ -545,20 +177,8 @@ terminate_cs_connection(void)
         } else {
             crm_info("No plugin connection");
         }
-
-    } else {
-        if (pcmk_cpg_handle) {
-            crm_info("Disconnecting CPG");
-            if (cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group) == CS_OK) {
-                crm_info("Destroying CPG");
-                cpg_finalize(pcmk_cpg_handle);
-            }
-            pcmk_cpg_handle = 0;
-
-        } else {
-            crm_info("No CPG connection");
-        }
     }
+    cluster_disconnect_cpg(cluster);
 
 #  if SUPPORT_CMAN
     if (is_cman_cluster()) {
@@ -578,155 +198,66 @@ terminate_cs_connection(void)
     ais_fd_sync = -1;
 }
 
-static crm_node_t *
-crm_update_ais_node(xmlNode * member, long long seq)
-{
-    const char *id_s = crm_element_value(member, "id");
-    const char *addr = crm_element_value(member, "addr");
-    const char *uname = crm_element_value(member, "uname");
-    const char *state = crm_element_value(member, "state");
-    const char *born_s = crm_element_value(member, "born");
-    const char *seen_s = crm_element_value(member, "seen");
-    const char *votes_s = crm_element_value(member, "votes");
-    const char *procs_s = crm_element_value(member, "processes");
-
-    int votes = crm_int_helper(votes_s, NULL);
-    unsigned int id = crm_int_helper(id_s, NULL);
-    unsigned int procs = crm_int_helper(procs_s, NULL);
-
-    /* TODO: These values will contain garbage if version < 0.7.1 */
-    uint64_t born = crm_int_helper(born_s, NULL);
-    uint64_t seen = crm_int_helper(seen_s, NULL);
-
-    return crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state);
-}
-
-static gboolean
-ais_dispatch_message(AIS_Message * msg,
-                     gboolean(*dispatch) (int kind, const char *from, const char *data))
+void
+plugin_handle_membership(AIS_Message *msg)
 {
-    char *data = NULL;
-    char *uncompressed = NULL;
-
-    xmlNode *xml = NULL;
+    if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) {
+        xmlNode *member = NULL;
+        const char *value = NULL;
+        gboolean quorate = FALSE;
+        xmlNode *xml = string2xml(msg->data);
 
-    CRM_ASSERT(msg != NULL);
-
-    crm_trace("Got new%s message (size=%d, %d, %d)",
-              msg->is_compressed ? " compressed" : "",
-              ais_data_len(msg), msg->size, msg->compressed_size);
-
-    data = msg->data;
-    if (msg->is_compressed && msg->size > 0) {
-        int rc = BZ_OK;
-        unsigned int new_size = msg->size + 1;
-
-        if (check_message_sanity(msg, NULL) == FALSE) {
-            goto badmsg;
+        if (xml == NULL) {
+            crm_err("Invalid membership update: %s", msg->data);
+            return;
         }
 
-        crm_trace("Decompressing message data");
-        uncompressed = calloc(1, new_size);
-        rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0);
-
-        if (rc != BZ_OK) {
-            crm_err("Decompression failed: %d", rc);
-            goto badmsg;
+        value = crm_element_value(xml, "quorate");
+        CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); return);
+        if (crm_is_true(value)) {
+            quorate = TRUE;
         }
 
-        CRM_ASSERT(rc == BZ_OK);
-        CRM_ASSERT(new_size == msg->size);
-
-        data = uncompressed;
-
-    } else if (check_message_sanity(msg, data) == FALSE) {
-        goto badmsg;
-
-    } else if (safe_str_eq("identify", data)) {
-        int pid = getpid();
-        char *pid_s = crm_itoa(pid);
-
-        send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
-        free(pid_s);
-        goto done;
-    }
-
-    if (msg->header.id != crm_class_members) {
-        crm_get_peer(msg->sender.id, msg->sender.uname);
-    }
-
-    if (msg->header.id == crm_class_rmpeer) {
-        uint32_t id = crm_int_helper(data, NULL);
-
-        crm_info("Removing peer %s/%u", data, id);
-        reap_crm_member(id, NULL);
-        goto done;
-
-    } else if (is_classic_ais_cluster()) {
-        if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) {
-            xmlNode *node = NULL;
-            const char *value = NULL;
-            gboolean quorate = FALSE;
-
-            xml = string2xml(data);
-            if (xml == NULL) {
-                crm_err("Invalid membership update: %s", data);
-                goto badmsg;
-            }
-
-            value = crm_element_value(xml, "quorate");
-            CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:");
-                      goto badmsg);
-            if (crm_is_true(value)) {
-                quorate = TRUE;
-            }
-
-            value = crm_element_value(xml, "id");
-            CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id");
-                      goto badmsg);
-            crm_peer_seq = crm_int_helper(value, NULL);
+        value = crm_element_value(xml, "id");
+        CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); return);
+        crm_peer_seq = crm_int_helper(value, NULL);
 
-            if (quorate != crm_have_quorum) {
-                crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost");
-                crm_have_quorum = quorate;
+        if (quorate != crm_have_quorum) {
+            crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost");
+            crm_have_quorum = quorate;
 
-            } else {
-                crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost");
-            }
-
-            for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) {
-                crm_update_ais_node(node, crm_peer_seq);
-            }
+        } else {
+            crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost");
         }
-    }
 
-    crm_trace("Payload: %s", data);
-    if (dispatch != NULL) {
-        dispatch(msg->header.id, msg->sender.uname, data);
+        for (member = __xml_first_child(xml); member != NULL; member = __xml_next(member)) {
+            const char *id_s = crm_element_value(member, "id");
+            const char *addr = crm_element_value(member, "addr");
+            const char *uname = crm_element_value(member, "uname");
+            const char *state = crm_element_value(member, "state");
+            const char *born_s = crm_element_value(member, "born");
+            const char *seen_s = crm_element_value(member, "seen");
+            const char *votes_s = crm_element_value(member, "votes");
+            const char *procs_s = crm_element_value(member, "processes");
+
+            int votes = crm_int_helper(votes_s, NULL);
+            unsigned int id = crm_int_helper(id_s, NULL);
+            unsigned int procs = crm_int_helper(procs_s, NULL);
+
+            /* TODO: These values will contain garbage if version < 0.7.1 */
+            uint64_t born = crm_int_helper(born_s, NULL);
+            uint64_t seen = crm_int_helper(seen_s, NULL);
+
+            crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state);
+        }
     }
-
-  done:
-    free(uncompressed);
-    free_xml(xml);
-    return TRUE;
-
-  badmsg:
-    crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
-            " min=%d, total=%d, size=%d, bz2_size=%d",
-            msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
-            ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
-            msg->sender.pid, (int)sizeof(AIS_Message),
-            msg->header.size, msg->size, msg->compressed_size);
-    goto done;
 }
 
 int
-ais_dispatch(gpointer user_data)
+plugin_dispatch(gpointer user_data)
 {
     int rc = CS_OK;
-    gboolean good = TRUE;
-
-    gboolean(*dispatch) (int kind, const char *from, const char *data) = user_data;
+    crm_cluster_t *cluster = (crm_cluster_t *) user_data;
 
     do {
         char *buffer = NULL;
@@ -743,20 +274,20 @@ ais_dispatch(gpointer user_data)
             /* NULL is a legal "no message afterall" value */
             return 0;
         }
-        good = ais_dispatch_message((AIS_Message *) buffer, dispatch);
+        /*
+        cpg_deliver_fn_t(cpg_handle_t handle, const struct cpg_name *group_name,
+                         uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
+        */
+        cluster->cpg.cpg_deliver_fn(0, NULL, 0, 0, buffer, 0);
         coroipcc_dispatch_put(ais_ipc_handle);
 
-    } while (good && ais_ipc_handle);
-
-    if (good) {
-        return 0;
-    }
+    } while (ais_ipc_handle);
 
-    return -1;
+    return 0;
 }
 
 static void
-ais_destroy(gpointer user_data)
+plugin_destroy(gpointer user_data)
 {
     crm_err("AIS connection terminated");
     ais_fd_sync = -1;
@@ -896,179 +427,6 @@ init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*
 }
 
 #  ifdef SUPPORT_COROSYNC
-gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
-static bool cpg_evicted = FALSE;
-
-static int
-pcmk_cpg_dispatch(gpointer user_data)
-{
-    int rc = 0;
-
-    pcmk_cpg_dispatch_fn = user_data;
-    rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL);
-    if (rc != CS_OK) {
-        crm_err("Connection to the CPG API failed: %d", rc);
-        pcmk_cpg_handle = 0;
-        return -1;
-
-    } else if(cpg_evicted) {
-        crm_err("Evicted from CPG membership");
-        return -1;
-    }
-    return 0;
-}
-
-static void
-pcmk_cpg_deliver(cpg_handle_t handle,
-                 const struct cpg_name *groupName,
-                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
-{
-    AIS_Message *ais_msg = (AIS_Message *) msg;
-    uint32_t local_nodeid = get_local_nodeid(handle);
-    const char *local_name = get_local_node_name();
-
-    if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) {
-        crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id);
-        return;
-
-    } else if (ais_msg->host.id != 0 && (local_nodeid != ais_msg->host.id)) {
-        /* Not for us */
-        return;
-
-    } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, local_name)) {
-        /* Not for us */
-        return;
-    }
-
-    ais_msg->sender.id = nodeid;
-    if (ais_msg->sender.size == 0) {
-        crm_node_t *peer = crm_get_peer(nodeid, NULL);
-
-        if (peer == NULL) {
-            crm_err("Peer with nodeid=%u is unknown", nodeid);
-
-        } else if (peer->uname == NULL) {
-            crm_err("No uname for peer with nodeid=%u", nodeid);
-
-        } else {
-            crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
-            ais_msg->sender.size = strlen(peer->uname);
-            memset(ais_msg->sender.uname, 0, MAX_NAME);
-            memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size);
-        }
-    }
-
-    ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn);
-}
-
-static void
-pcmk_cpg_membership(cpg_handle_t handle,
-                    const struct cpg_name *groupName,
-                    const struct cpg_address *member_list, size_t member_list_entries,
-                    const struct cpg_address *left_list, size_t left_list_entries,
-                    const struct cpg_address *joined_list, size_t joined_list_entries)
-{
-    int i;
-    gboolean found = FALSE;
-    static int counter = 0;
-    uint32_t local_nodeid = get_local_nodeid(handle);
-
-    for (i = 0; i < left_list_entries; i++) {
-        crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
-
-        crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid);
-        crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
-    }
-
-    for (i = 0; i < joined_list_entries; i++) {
-        crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid);
-    }
-
-    for (i = 0; i < member_list_entries; i++) {
-        crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
-
-        crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid);
-        crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
-        if (local_nodeid == member_list[i].nodeid) {
-            found = TRUE;
-        }
-    }
-
-    if (!found) {
-        crm_err("We're not part of CPG group %s anymore!", groupName->value);
-        cpg_evicted = TRUE;
-    }
-
-    counter++;
-}
-
-cpg_callbacks_t cpg_callbacks = {
-    .cpg_deliver_fn = pcmk_cpg_deliver,
-    .cpg_confchg_fn = pcmk_cpg_membership,
-};
-#  endif
-
-static gboolean
-init_cpg_connection(crm_cluster_t * cluster)
-{
-#  ifdef SUPPORT_COROSYNC
-    int rc = -1;
-    int fd = 0;
-    int retries = 0;
-    crm_node_t *peer = NULL;
-
-    struct mainloop_fd_callbacks cpg_fd_callbacks = {
-        .dispatch = pcmk_cpg_dispatch,
-        .destroy = cluster->destroy,
-    };
-
-    cpg_evicted = FALSE;
-    strcpy(pcmk_cpg_group.value, crm_system_name);
-    pcmk_cpg_group.length = strlen(crm_system_name) + 1;
-
-    cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks));
-    if (rc != CS_OK) {
-        crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
-        goto bail;
-    }
-
-    retries = 0;
-    cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)&cluster->nodeid));
-    if (rc != CS_OK) {
-        crm_err("Could not get local node id from the CPG API");
-        goto bail;
-    }
-
-    retries = 0;
-    cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group));
-    if (rc != CS_OK) {
-        crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
-        goto bail;
-    }
-
-    rc = cpg_fd_get(pcmk_cpg_handle, &fd);
-    if (rc != CS_OK) {
-        crm_err("Could not obtain the CPG API connection: %d\n", rc);
-        goto bail;
-    }
-
-    mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster->cs_dispatch, &cpg_fd_callbacks);
-
-  bail:
-    if (rc != CS_OK) {
-        cpg_finalize(pcmk_cpg_handle);
-        return FALSE;
-    }
-
-    peer = crm_get_peer(cluster->nodeid, NULL);
-    crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
-
-#  else
-    crm_err("The Corosync CPG API is not supported in this build");
-    crm_exit(DAEMON_RESPAWN_STOP);
-#  endif
-    return TRUE;
-}
 
 gboolean
 init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean),
@@ -1086,9 +444,11 @@ init_cs_connection_classic(crm_cluster_t * cluster)
     int pid = 0;
     char *pid_s = NULL;
     const char *name = NULL;
+    crm_node_t *peer = NULL;
+    enum crm_proc_flag proc = 0;
 
     struct mainloop_fd_callbacks ais_fd_callbacks = {
-        .dispatch = ais_dispatch,
+        .dispatch = plugin_dispatch,
         .destroy = cluster->destroy,
     };
 
@@ -1099,7 +459,7 @@ init_cs_connection_classic(crm_cluster_t * cluster)
     if (ais_ipc_handle) {
         coroipcc_fd_get(ais_ipc_handle, &ais_fd_async);
     } else {
-        crm_info("Connection to our AIS plugin (%d) failed: %s (%d)",
+        crm_info("Connection to our Corosync plugin (%d) failed: %s (%d)",
                  PCMK_SERVICE_ID, strerror(errno), errno);
         return FALSE;
     }
@@ -1108,7 +468,7 @@ init_cs_connection_classic(crm_cluster_t * cluster)
         rc = CS_ERR_LIBRARY;
     }
     if (rc != CS_OK) {
-        crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID,
+        crm_info("Connection to our Corosync plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID,
                  ais_error2text(rc), rc);
     }
 
@@ -1117,16 +477,15 @@ init_cs_connection_classic(crm_cluster_t * cluster)
     }
 
     if (ais_fd_callbacks.destroy == NULL) {
-        ais_fd_callbacks.destroy = ais_destroy;
+        ais_fd_callbacks.destroy = plugin_destroy;
     }
 
-    mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster->cs_dispatch,
-                    &ais_fd_callbacks);
+    mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster, &ais_fd_callbacks);
     crm_info("AIS connection established");
 
     pid = getpid();
     pid_s = crm_itoa(pid);
-    send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
+    send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
     free(pid_s);
 
     cluster->nodeid = get_local_nodeid(0);
@@ -1141,6 +500,9 @@ init_cs_connection_classic(crm_cluster_t * cluster)
         crm_exit(ENOTUNIQ);
     }
 
+    proc = text2proc(crm_system_name);
+    peer = crm_get_peer(cluster->nodeid, cluster->uname);
+    crm_update_peer_proc(__FUNCTION__, peer, proc|crm_proc_plugin, ONLINESTATUS);
 
     return TRUE;
 }
@@ -1275,7 +637,7 @@ init_cs_connection_once(crm_cluster_t * cluster)
             }
             break;
         case pcmk_cluster_cman:
-            if (init_cpg_connection(cluster) == FALSE) {
+            if (cluster_connect_cpg(cluster) == FALSE) {
                 return FALSE;
             }
             cluster->uname = cman_node_name(0 /* CMAN_NODEID_US */ );
diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c
index a1e044c..875c1c8 100644
--- a/lib/cluster/membership.c
+++ b/lib/cluster/membership.c
@@ -125,7 +125,7 @@ crm_active_peers(void)
     return count;
 }
 
-void
+static void
 destroy_crm_node(gpointer data)
 {
     crm_node_t *node = data;
@@ -143,14 +143,6 @@ destroy_crm_node(gpointer data)
 void
 crm_peer_init(void)
 {
-    static gboolean initialized = FALSE;
-
-    if (initialized) {
-        return;
-    }
-    initialized = TRUE;
-
-    crm_peer_destroy();
     if (crm_peer_cache == NULL) {
         crm_peer_cache = g_hash_table_new_full(crm_str_hash, g_str_equal, free, destroy_crm_node);
     }
diff --git a/lib/common/logging.c b/lib/common/logging.c
index a1b01f2..155a068 100644
--- a/lib/common/logging.c
+++ b/lib/common/logging.c
@@ -95,6 +95,10 @@ crm_glib_handler(const gchar * log_domain, GLogLevelFlags flags, const gchar * m
 static void
 crm_trigger_blackbox(int nsig)
 {
+    if(nsig == SIGTRAP) {
+        /* Turn it on if it wasn't already */
+        crm_enable_blackbox(nsig);
+    }
     crm_write_blackbox(nsig, NULL);
 }
 
@@ -344,15 +348,6 @@ crm_enable_blackbox(int nsig)
 
         crm_update_callsites();
 
-        /* Original meanings from signal(7)
-         *
-         * Signal       Value     Action   Comment
-         * SIGTRAP        5        Core    Trace/breakpoint trap
-         *
-         * Our usage is as similar as possible
-         */
-        mainloop_add_signal(SIGTRAP, crm_trigger_blackbox);
-
         blackbox_trigger = qb_log_custom_open(blackbox_logger, NULL, NULL, NULL);
         qb_log_ctl(blackbox_trigger, QB_LOG_CONF_ENABLED, QB_TRUE);
         crm_trace("Trigger: %d is %d %d", blackbox_trigger,
@@ -762,7 +757,17 @@ crm_log_init(const char *entity, int level, gboolean daemon, gboolean to_stderr,
             }
 #endif
         }
+
+        /* Original meanings from signal(7)
+         *
+         * Signal       Value     Action   Comment
+         * SIGTRAP        5        Core    Trace/breakpoint trap
+         * SIGUSR1     30,10,16    Term    User-defined signal 1
+         *
+         * Our usage is as similar as possible
+         */
         mainloop_add_signal(SIGUSR1, crm_enable_blackbox);
+        mainloop_add_signal(SIGTRAP, crm_trigger_blackbox);
     }
 
     crm_xml_init(); /* Sets buffer allocation strategy */
diff --git a/lib/services/systemd.c b/lib/services/systemd.c
index 886cb35..2a66da5 100644
--- a/lib/services/systemd.c
+++ b/lib/services/systemd.c
@@ -407,6 +407,8 @@ systemd_unit_exec_done(GObject * source_object, GAsyncResult * res, gpointer use
     }
 }
 
+#define SYSTEMD_OVERRIDE_ROOT "/run/systemd/system/"
+
 gboolean
 systemd_unit_exec(svc_action_t * op, gboolean synchronous)
 {
@@ -453,9 +455,42 @@ systemd_unit_exec(svc_action_t * op, gboolean synchronous)
         goto cleanup;
 
     } else if (g_strcmp0(action, "start") == 0) {
+        FILE *file_strm = NULL;
+        char *override_dir = g_strdup_printf("%s/%s", SYSTEMD_OVERRIDE_ROOT, unit);
+        char *override_file = g_strdup_printf("%s/50-pacemaker.conf", override_dir);
+
         action = "StartUnit";
+        crm_build_path(override_dir, 0755);
+
+        file_strm = fopen(override_file, "w");
+        if (file_strm != NULL) {
+            int rc = fprintf(file_strm, "[Service]\nRestart=no");
+            if (rc < 0) {
+                crm_perror(LOG_ERR, "Cannot write to systemd override file %s: %s (%d)", override_file, pcmk_strerror(errno), errno);
+            }
+
+        } else {
+            crm_err("Cannot open systemd override file %s for writing: %s (%d)", override_file, pcmk_strerror(errno), errno);
+        }
+
+        if (file_strm != NULL) {
+            fflush(file_strm);
+            fclose(file_strm);
+        }
+        systemd_daemon_reload(systemd_proxy, &error);
+        g_error_free(error); error = NULL;
+        free(override_file);
+        free(override_dir);
+
     } else if (g_strcmp0(action, "stop") == 0) {
+        char *override_file = g_strdup_printf("%s/%s/50-pacemaker.conf", SYSTEMD_OVERRIDE_ROOT, unit);
+
         action = "StopUnit";
+        unlink(override_file);
+        free(override_file);
+        systemd_daemon_reload(systemd_proxy, &error);
+        g_error_free(error); error = NULL;
+
     } else if (g_strcmp0(action, "restart") == 0) {
         action = "RestartUnit";
     } else {
diff --git a/lrmd/Makefile.am b/lrmd/Makefile.am
index 73f1d7e..82cb65f 100644
--- a/lrmd/Makefile.am
+++ b/lrmd/Makefile.am
@@ -27,7 +27,7 @@ initdir		 = $(INITDIR)
 init_SCRIPTS	 = pacemaker_remote
 sbin_PROGRAMS	 = pacemaker_remoted
 
-if HAVE_SYSTEMD
+if BUILD_SYSTEMD
 systemdunit_DATA = pacemaker_remote.service
 endif
 
diff --git a/mcp/Makefile.am b/mcp/Makefile.am
index 73a71c4..f98f286 100644
--- a/mcp/Makefile.am
+++ b/mcp/Makefile.am
@@ -29,7 +29,7 @@ if BUILD_HELP
 man8_MANS =	$(sbin_PROGRAMS:%=%.8)
 endif
 
-if HAVE_SYSTEMD
+if BUILD_SYSTEMD
 systemdunit_DATA = pacemaker.service
 endif
 
diff --git a/mcp/corosync.c b/mcp/corosync.c
index 64d6eb5..ca37871 100644
--- a/mcp/corosync.c
+++ b/mcp/corosync.c
@@ -43,13 +43,7 @@
 #  include <corosync/cmap.h>
 #endif
 
-static struct cpg_name cpg_group = {
-    .length = 0,
-    .value[0] = 0,
-};
-
 enum cluster_type_e stack = pcmk_cluster_unknown;
-static cpg_handle_t cpg_handle;
 static corosync_cfg_handle_t cfg_handle;
 
 /* =::=::=::= CFG - Shutdown stuff =::=::=::= */
@@ -155,169 +149,6 @@ cluster_connect_cfg(uint32_t * nodeid)
     return FALSE;
 }
 
-/* =::=::=::= CPG - Closed Process Group Messaging =::=::=::= */
-
-static int
-pcmk_cpg_dispatch(gpointer user_data)
-{
-    cpg_handle_t *handle = (cpg_handle_t *) user_data;
-    cs_error_t rc = cpg_dispatch(*handle, CS_DISPATCH_ALL);
-
-    if (rc != CS_OK) {
-        return -1;
-    }
-    return 0;
-}
-
-static void
-cpg_connection_destroy(gpointer user_data)
-{
-    crm_err("Connection destroyed");
-    cpg_handle = 0;
-    crm_exit(ENOTCONN);
-}
-
-static void
-pcmk_cpg_deliver(cpg_handle_t handle,
-                 const struct cpg_name *groupName,
-                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
-{
-    if (nodeid != local_nodeid) {
-        uint32_t procs = 0;
-        xmlNode *xml = string2xml(msg);
-        const char *uname = crm_element_value(xml, "uname");
-
-        crm_element_value_int(xml, "proclist", (int *)&procs);
-        /* crm_debug("Got proclist %.32x from %s", procs, uname); */
-        if (update_node_processes(nodeid, uname, procs)) {
-            update_process_clients();
-        }
-    }
-}
-
-static void
-pcmk_cpg_membership(cpg_handle_t handle,
-                    const struct cpg_name *groupName,
-                    const struct cpg_address *member_list, size_t member_list_entries,
-                    const struct cpg_address *left_list, size_t left_list_entries,
-                    const struct cpg_address *joined_list, size_t joined_list_entries)
-{
-    /* Don't care about CPG membership */
-    update_process_peers();
-}
-
-cpg_callbacks_t cpg_callbacks = {
-    .cpg_deliver_fn = pcmk_cpg_deliver,
-    .cpg_confchg_fn = pcmk_cpg_membership,
-};
-
-gboolean
-cluster_disconnect_cpg(void)
-{
-    if (cpg_handle) {
-        cpg_finalize(cpg_handle);
-        cpg_handle = 0;
-    }
-    return TRUE;
-}
-
-gboolean
-cluster_connect_cpg(void)
-{
-    cs_error_t rc;
-    unsigned int nodeid;
-    int fd;
-    int retries = 0;
-
-    static struct mainloop_fd_callbacks cpg_fd_callbacks = {
-        .dispatch = pcmk_cpg_dispatch,
-        .destroy = cpg_connection_destroy,
-    };
-
-    strcpy(cpg_group.value, "pcmk");
-    cpg_group.length = strlen(cpg_group.value) + 1;
-
-    retries = 0;
-    cs_repeat(retries, 30, rc = cpg_initialize(&cpg_handle, &cpg_callbacks));
-    if (rc != CS_OK) {
-        crm_err("corosync cpg init error %d", rc);
-        return FALSE;
-    }
-
-    rc = cpg_fd_get(cpg_handle, &fd);
-    if (rc != CS_OK) {
-        crm_err("corosync cpg fd_get error %d", rc);
-        goto bail;
-    }
-
-    retries = 0;
-    cs_repeat(retries, 30, rc = cpg_local_get(cpg_handle, &nodeid));
-    if (rc != CS_OK) {
-        crm_err("corosync cpg local_get error %d", rc);
-        goto bail;
-    }
-
-    crm_debug("Our nodeid: %d", nodeid);
-
-    retries = 0;
-    cs_repeat(retries, 30, rc = cpg_join(cpg_handle, &cpg_group));
-
-    if (rc != CS_OK) {
-        crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
-        goto bail;
-    }
-
-    mainloop_add_fd("corosync-cpg", G_PRIORITY_DEFAULT, fd, &cpg_handle, &cpg_fd_callbacks);
-    return TRUE;
-
-  bail:
-    cpg_finalize(cpg_handle);
-    return FALSE;
-}
-
-gboolean
-send_cpg_message(struct iovec * iov)
-{
-    int rc = CS_OK;
-    int retries = 0;
-
-    errno = 0;
-
-    do {
-        rc = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1);
-        if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
-            cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED;
-            int rc2 = cpg_flow_control_state_get(cpg_handle, &fc_state);
-
-            if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
-                crm_debug("Attempting to clear cpg dispatch queue");
-                rc2 = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL);
-            }
-
-            if (rc2 != CS_OK) {
-                crm_warn("Could not check/clear the cpg connection");
-                goto bail;
-
-            } else {
-                retries++;
-                crm_debug("Retrying operation after %ds", retries);
-                sleep(retries);
-            }
-        } else {
-            break;
-        }
-
-        /* 5 retires is plenty, we'll resend once the membership reforms anyway */
-    } while (retries < 5);
-
-  bail:
-    if (rc != CS_OK) {
-        crm_err("Sending message via cpg FAILED: (rc=%d) %s", rc, ais_error2text(rc));
-    }
-
-    return (rc == CS_OK);
-}
-
 /* =::=::=::= Configuration =::=::=::= */
 #if HAVE_CONFDB
 static int
@@ -447,7 +278,7 @@ read_config(void)
 
 #if HAVE_CONFDB
     char *value = NULL;
-    confdb_handle_t config;
+    confdb_handle_t config = 0;
     confdb_handle_t top_handle = 0;
     hdb_handle_t local_handle;
     static confdb_callbacks_t callbacks = { };
@@ -456,7 +287,8 @@ read_config(void)
         rc = confdb_initialize(&config, &callbacks);
         if (rc != CS_OK) {
             retries++;
-            printf("Connection setup failed: %d.  Retrying in %ds\n", rc, retries);
+            printf("confdb connection setup failed: %s.  Retrying in %ds\n", ais_error2text(rc), retries);
+            crm_info("confdb connection setup failed: %s.  Retrying in %ds", ais_error2text(rc), retries);
             sleep(retries);
 
         } else {
@@ -473,8 +305,8 @@ read_config(void)
         rc = cmap_initialize(&local_handle);
         if (rc != CS_OK) {
             retries++;
-            printf("API connection setup failed: %s.  Retrying in %ds\n", cs_strerror(rc), retries);
-            crm_info("API connection setup failed: %s.  Retrying in %ds", cs_strerror(rc), retries);
+            printf("cmap connection setup failed: %s.  Retrying in %ds\n", cs_strerror(rc), retries);
+            crm_info("cmap connection setup failed: %s.  Retrying in %ds", cs_strerror(rc), retries);
             sleep(retries);
 
         } else {
diff --git a/mcp/pacemaker.c b/mcp/pacemaker.c
index 47fdd68..6f8d9b9 100644
--- a/mcp/pacemaker.c
+++ b/mcp/pacemaker.c
@@ -29,6 +29,7 @@
 #include <crm/msg_xml.h>
 #include <crm/common/ipcs.h>
 #include <crm/common/mainloop.h>
+#include <crm/cluster/internal.h>
 #include <crm/cluster.h>
 
 #include <dirent.h>
@@ -44,22 +45,6 @@ uint32_t local_nodeid = 0;
 crm_trigger_t *shutdown_trigger = NULL;
 const char *pid_file = "/var/run/pacemaker.pid";
 
-/* *INDENT-OFF* */
-enum crm_proc_flag {
-    crm_proc_none       = 0x00000001,
-    crm_proc_plugin        = 0x00000002,
-    crm_proc_lrmd       = 0x00000010,
-    crm_proc_cib        = 0x00000100,
-    crm_proc_crmd       = 0x00000200,
-    crm_proc_attrd      = 0x00001000,
-    crm_proc_stonithd   = 0x00002000,
-    crm_proc_pe         = 0x00010000,
-    crm_proc_te         = 0x00020000,
-    crm_proc_mgmtd      = 0x00040000,
-    crm_proc_stonith_ng = 0x00100000,
-};
-/* *INDENT-ON* */
-
 typedef struct pcmk_child_s {
     int pid;
     long flag;
@@ -539,8 +524,10 @@ update_process_clients(void)
 void
 update_process_peers(void)
 {
+    /* Do nothing for corosync-2 based clusters */
+
     char buffer[1024];
-    struct iovec iov;
+    struct iovec *iov;
     int rc = 0;
 
     memset(buffer, 0, SIZEOF(buffer));
@@ -552,11 +539,11 @@ update_process_peers(void)
         rc = snprintf(buffer, SIZEOF(buffer) - 1, "<node proclist=\"%u\"/>", get_process_list());
     }
 
-    iov.iov_base = buffer;
-    iov.iov_len = rc + 1;
-
     crm_trace("Sending %s", buffer);
-    send_cpg_message(&iov);
+    iov = calloc(1, sizeof(struct iovec));
+    iov->iov_base = strdup(buffer);
+    iov->iov_len = rc + 1;
+    send_cpg_iov(iov);
 }
 
 gboolean
@@ -619,6 +606,7 @@ update_node_processes(uint32_t id, const char *uname, uint32_t procs)
     return changed;
 }
 
+
 /* *INDENT-OFF* */
 static struct crm_option long_options[] = {
     /* Top-level Options */
@@ -779,6 +767,42 @@ init_children_processes(void)
     }
 }
 
+static void
+mcp_cpg_destroy(gpointer user_data)
+{
+    crm_err("Connection destroyed");
+    crm_exit(ENOTCONN);
+}
+
+static void
+mcp_cpg_deliver(cpg_handle_t handle,
+                 const struct cpg_name *groupName,
+                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+    if (nodeid != local_nodeid) {
+        uint32_t procs = 0;
+        xmlNode *xml = string2xml(msg);
+        const char *uname = crm_element_value(xml, "uname");
+
+        crm_element_value_int(xml, "proclist", (int *)&procs);
+        /* crm_debug("Got proclist %.32x from %s", procs, uname); */
+        if (update_node_processes(nodeid, uname, procs)) {
+            update_process_clients();
+        }
+    }
+}
+
+static void
+mcp_cpg_membership(cpg_handle_t handle,
+                    const struct cpg_name *groupName,
+                    const struct cpg_address *member_list, size_t member_list_entries,
+                    const struct cpg_address *left_list, size_t left_list_entries,
+                    const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+    /* Don't care about CPG membership, but we do want to broadcast our own presence */
+    update_process_peers();
+}
+
 int
 main(int argc, char **argv)
 {
@@ -795,6 +819,7 @@ main(int argc, char **argv)
     crm_ipc_t *old_instance = NULL;
     qb_ipcs_service_t *ipcs = NULL;
     const char *facility = daemon_option("logfacility");
+    static crm_cluster_t cluster;
 
     setenv("LC_ALL", "C", 1);
     setenv("HA_LOGD", "no", 1);
@@ -951,12 +976,17 @@ main(int argc, char **argv)
         crm_exit(EIO);
     }
 
+    /* Allows us to block shutdown */
     if (cluster_connect_cfg(&local_nodeid) == FALSE) {
         crm_err("Couldn't connect to Corosync's CFG service");
         crm_exit(ENOPROTOOPT);
     }
 
-    if (cluster_connect_cpg() == FALSE) {
+    cluster.destroy = mcp_cpg_destroy;
+    cluster.cpg.cpg_deliver_fn = mcp_cpg_deliver;
+    cluster.cpg.cpg_confchg_fn = mcp_cpg_membership;
+
+    if(cluster_connect_cpg(&cluster) == FALSE) {
         crm_err("Couldn't connect to Corosync's CPG service");
         crm_exit(ENOPROTOOPT);
     }
@@ -982,7 +1012,7 @@ main(int argc, char **argv)
 
     g_main_destroy(mainloop);
 
-    cluster_disconnect_cpg();
+    cluster_disconnect_cpg(&cluster);
     cluster_disconnect_cfg();
 
     crm_info("Exiting %s", crm_system_name);
diff --git a/mcp/pacemaker.h b/mcp/pacemaker.h
index 224df93..8967966 100644
--- a/mcp/pacemaker.h
+++ b/mcp/pacemaker.h
@@ -41,20 +41,16 @@ typedef struct pcmk_peer_s {
     char *uname;
 } pcmk_peer_t;
 
-extern gboolean read_config(void);
+gboolean read_config(void);
 
-extern gboolean cluster_connect_cfg(uint32_t * nodeid);
-extern gboolean cluster_disconnect_cfg(void);
+gboolean cluster_connect_cfg(uint32_t * nodeid);
+gboolean cluster_disconnect_cfg(void);
 
-extern gboolean cluster_connect_cpg(void);
-extern gboolean cluster_disconnect_cpg(void);
-extern gboolean send_cpg_message(struct iovec *iov);
+void update_process_clients(void);
+void update_process_peers(void);
+gboolean update_node_processes(uint32_t node, const char *uname, uint32_t procs);
 
-extern void update_process_clients(void);
-extern void update_process_peers(void);
-extern gboolean update_node_processes(uint32_t node, const char *uname, uint32_t procs);
+void enable_mgmtd(gboolean enable);
+void enable_crmd_as_root(gboolean enable);
 
-extern void enable_mgmtd(gboolean enable);
-extern void enable_crmd_as_root(gboolean enable);
-
-extern void pcmk_shutdown(int nsig);
+void pcmk_shutdown(int nsig);
diff --git a/mcp/pacemaker.in b/mcp/pacemaker.in
index a6647fe..c96f1d1 100644
--- a/mcp/pacemaker.in
+++ b/mcp/pacemaker.in
@@ -111,6 +111,7 @@ cman_pre_start()
     pid=$(pidof corosync 2>/dev/null)
     if [ $? -ne 0 ]; then
 	service cman start
+	sleep 2
     fi
 }
 
diff --git a/tools/attrd.c b/tools/attrd.c
index 1e834ea..2d485f9 100644
--- a/tools/attrd.c
+++ b/tools/attrd.c
@@ -325,11 +325,19 @@ attrd_ha_callback(HA_Message * msg, void *private_data)
 #endif
 
 #if SUPPORT_COROSYNC
-static gboolean
-attrd_ais_dispatch(int kind, const char *from, const char *data)
+static void
+attrd_cs_dispatch(cpg_handle_t handle,
+                 const struct cpg_name *groupName,
+                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
+    uint32_t kind = 0;
     xmlNode *xml = NULL;
+    const char *from = NULL;
+    char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
 
+    if(data == NULL) {
+        return;
+    }
     if (kind == crm_class_cluster) {
         xml = string2xml(data);
         if (xml == NULL) {
@@ -360,11 +368,11 @@ attrd_ais_dispatch(int kind, const char *from, const char *data)
         free_xml(xml);
     }
 
-    return TRUE;
+    free(data);
 }
 
 static void
-attrd_ais_destroy(gpointer unused)
+attrd_cs_destroy(gpointer unused)
 {
     if (need_shutdown) {
         /* we signed out, so this is expected */
@@ -405,7 +413,7 @@ update_for_hash_entry(gpointer key, gpointer value, gpointer user_data)
 {
     attr_hash_entry_t *entry = value;
 
-    if (entry->value != NULL) {
+    if (entry->value != NULL || entry->stored_value != NULL) {
         attrd_timer_callback(value);
     }
 }
@@ -537,8 +545,9 @@ main(int argc, char **argv)
 
 #if SUPPORT_COROSYNC
         if (is_openais_cluster()) {
-            cluster.destroy = attrd_ais_destroy;
-            cluster.cs_dispatch = attrd_ais_dispatch;
+            cluster.destroy = attrd_cs_destroy;
+            cluster.cpg.cpg_deliver_fn = attrd_cs_dispatch;
+            cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership;
         }
 #endif
 
diff --git a/tools/crm_node.c b/tools/crm_node.c
index a25b3b4..aacea76 100644
--- a/tools/crm_node.c
+++ b/tools/crm_node.c
@@ -500,16 +500,23 @@ crm_add_member(gpointer key, gpointer value, gpointer user_data)
     }
 }
 
-static gboolean
-ais_membership_dispatch(int kind, const char *from, const char *data)
+static void
+ais_membership_dispatch(cpg_handle_t handle,
+                          const struct cpg_name *groupName,
+                          uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 {
+    uint32_t kind = 0;
+    const char *from = NULL;
+    char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+
     switch (kind) {
         case crm_class_members:
         case crm_class_notify:
         case crm_class_quorum:
             break;
         default:
-            return TRUE;
+            free(data);
+            return;
 
             break;
     }
@@ -548,9 +555,10 @@ ais_membership_dispatch(int kind, const char *from, const char *data)
         fprintf(stdout, "\n");
     }
 
+    free(data);
     crm_exit(pcmk_ok);
 
-    return TRUE;
+    return;
 }
 #endif
 
@@ -695,7 +703,8 @@ try_openais(int command, enum cluster_type_e stack)
     static crm_cluster_t cluster;
 
     cluster.destroy = ais_membership_destroy;
-    cluster.cs_dispatch = ais_membership_dispatch;
+    cluster.cpg.cpg_deliver_fn = ais_membership_dispatch;
+    cluster.cpg.cpg_confchg_fn = NULL;
 
     if (init_cs_connection_once(&cluster)) {
 
@@ -703,7 +712,7 @@ try_openais(int command, enum cluster_type_e stack)
 
         switch (command) {
             case 'R':
-                send_ais_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais);
+                send_cluster_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais);
                 cib_remove_node(0, target_uname);
                 crm_exit(pcmk_ok);
 
@@ -713,13 +722,13 @@ try_openais(int command, enum cluster_type_e stack)
                 crm_exit(pcmk_ok);
 
             case 'q':
-                send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
+                send_cluster_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
                 break;
 
             case 'l':
             case 'p':
                 crm_info("Requesting the list of configured nodes");
-                send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais);
+                send_cluster_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais);
                 break;
 
             case 'i':