Blob Blame History Raw
From bc0f05e6f3a3c93c853ceffd1f6d2022dc30fb77 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= <edvin.torok@citrix.com>
Date: Wed, 12 Oct 2022 19:13:04 +0100
Subject: tools/ocaml: Limit maximum in-flight requests / outstanding replies
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Introduce a limit on the number of outstanding reply packets in the xenbus
queue.  This limits the number of in-flight requests: when the output queue is
full we'll stop processing inputs until the output queue has room again.

To avoid a busy loop on the Unix socket we only add it to the watched input
file descriptor set if we'd be able to call `input` on it.  Even though Dom0
is trusted and exempt from quotas a flood of events might cause a backlog
where events are produced faster than daemons in Dom0 can consume them, which
could lead to an unbounded queue size and OOM.

Therefore the xenbus queue limit must apply to all connections, Dom0 is not
exempt from it, although if everything works correctly it will eventually
catch up.

This prevents a malicious guest from sending more commands while it has
outstanding watch events or command replies in its input ring.  However if it
can cause the generation of watch events by other means (e.g. by Dom0, or
another cooperative guest) and stop reading its own ring then watch events
would've queued up without limit.

The xenstore protocol doesn't have a back-pressure mechanism, and doesn't
allow dropping watch events.  In fact, dropping watch events is known to break
some pieces of normal functionality.  This leaves little choice to safely
implement the xenstore protocol without exposing the xenstore daemon to
out-of-memory attacks.

Implement the fix as pipes with bounded buffers:
* Use a bounded buffer for watch events
* The watch structure will have a bounded receiving pipe of watch events
* The source will have an "overflow" pipe of pending watch events it couldn't
  deliver

Items are queued up on one end and are sent as far along the pipe as possible:

  source domain -> watch -> xenbus of target -> xenstore ring/socket of target

If the pipe is "full" at any point then back-pressure is applied and we prevent
more items from being queued up.  For the source domain this means that we'll
stop accepting new commands as long as its pipe buffer is not empty.

Before we try to enqueue an item we first check whether it is possible to send
it further down the pipe, by attempting to recursively flush the pipes. This
ensures that we retain the order of events as much as possible.

We might break causality of watch events if the target domain's queue is full
and we need to start using the watch's queue.  This is a breaking change in
the xenstore protocol, but only for domains which are not processing their
incoming ring as expected.

When a watch is deleted its entire pending queue is dropped (no code is needed
for that, because it is part of the 'watch' type).

There is a cache of watches that have pending events that we attempt to flush
at every cycle if possible.

Introduce 3 limits here:
* quota-maxwatchevents on watch event destination: when this is hit the
  source will not be allowed to queue up more watch events.
* quota-maxoustanding which is the number of responses not read from the ring:
  once exceeded, no more inputs are processed until all outstanding replies
  are consumed by the client.
* overflow queue on the watch event source: all watches that cannot be stored
  on destination are queued up here, a single command can trigger multiple
  watches (e.g. due to recursion).

The overflow queue currently doesn't have an upper bound, it is difficult to
accurately calculate one as it depends on whether you are Dom0 and how many
watches each path has registered and how many watch events you can trigger
with a single command (e.g. a commit).  However these events were already
using memory, this just moves them elsewhere, and as long as we correctly
block a domain it shouldn't result in unbounded memory usage.

Note that Dom0 is not excluded from these checks, it is important that Dom0 is
especially not excluded when it is the source, since there are many ways in
which a guest could trigger Dom0 to send it watch events.

This should protect against malicious frontends as long as the backend follows
the PV xenstore protocol and only exposes paths needed by the frontend, and
changes those paths at most once as a reaction to guest events, or protocol
state.

The queue limits are per watch, and per domain-pair, so even if one
communication channel would be "blocked", others would keep working, and the
domain itself won't get blocked as long as it doesn't overflow the queue of
watch events.

Similarly a malicious backend could cause the frontend to get blocked, but
this watch queue protects the frontend as well as long as it follows the PV
protocol.  (Although note that protection against malicious backends is only a
best effort at the moment)

This is part of XSA-326 / CVE-2022-42318.

Reported-by: Julien Grall <jgrall@amazon.com>
Signed-off-by: Edwin Török <edvin.torok@citrix.com>
Acked-by: Christian Lindig <christian.lindig@citrix.com>

diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml
index 4197a3888a68..b292ed7a874d 100644
--- a/tools/ocaml/libs/xb/xb.ml
+++ b/tools/ocaml/libs/xb/xb.ml
@@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap
 
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
 
+(*
+	separate capacity reservation for replies and watch events:
+	this allows a domain to keep working even when under a constant flood of
+	watch events
+*)
+type capacity = { maxoutstanding: int; maxwatchevents: int }
+
+module Queue = BoundedQueue
+
+type packet_class =
+	| CommandReply
+	| Watchevent
+
+let string_of_packet_class = function
+	| CommandReply -> "command_reply"
+	| Watchevent -> "watch_event"
+
 type t =
 {
 	backend: backend;
-	pkt_out: Packet.t Queue.t;
+	pkt_out: (Packet.t, packet_class) Queue.t;
 	mutable partial_in: partial_buf;
 	mutable partial_out: string;
+	capacity: capacity
 }
 
+let to_read con =
+	match con.partial_in with
+		| HaveHdr partial_pkt -> Partial.to_complete partial_pkt
+		| NoHdr   (i, _)    -> i
+
+let debug t =
+	Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s"
+		(to_read t)
+		(String.length t.partial_out)
+		(Queue.length t.pkt_out)
+		(BoundedQueue.debug string_of_packet_class t.pkt_out)
+
 let init_partial_in () = NoHdr
 	(Partial.header_size (), Bytes.make (Partial.header_size()) '\000')
 
@@ -199,7 +229,8 @@ let output con =
 	let s = if String.length con.partial_out > 0 then
 			con.partial_out
 		else if Queue.length con.pkt_out > 0 then
-			Packet.to_string (Queue.pop con.pkt_out)
+			let pkt = Queue.pop con.pkt_out in
+			Packet.to_string pkt
 		else
 			"" in
 	(* send data from s, and save the unsent data to partial_out *)
@@ -212,12 +243,15 @@ let output con =
 	(* after sending one packet, partial is empty *)
 	con.partial_out = ""
 
+(* we can only process an input packet if we're guaranteed to have room
+   to store the response packet *)
+let can_input con = Queue.can_push con.pkt_out CommandReply
+
 (* NB: can throw Reconnect *)
 let input con =
-	let to_read =
-		match con.partial_in with
-		| HaveHdr partial_pkt -> Partial.to_complete partial_pkt
-		| NoHdr   (i, _)    -> i in
+	if not (can_input con) then None
+	else
+	let to_read = to_read con in
 
 	(* try to get more data from input stream *)
 	let b = Bytes.make to_read '\000' in
@@ -243,11 +277,22 @@ let input con =
 		None
 	)
 
-let newcon backend = {
+let classify t =
+	match t.Packet.ty with
+	| Op.Watchevent -> Watchevent
+	| _ -> CommandReply
+
+let newcon ~capacity backend =
+	let limit = function
+		| CommandReply -> capacity.maxoutstanding
+		| Watchevent -> capacity.maxwatchevents
+	in
+	{
 	backend = backend;
-	pkt_out = Queue.create ();
+	pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit;
 	partial_in = init_partial_in ();
 	partial_out = "";
+	capacity = capacity;
 	}
 
 let open_fd fd = newcon (Fd { fd = fd; })
diff --git a/tools/ocaml/libs/xb/xb.mli b/tools/ocaml/libs/xb/xb.mli
index 91c682162cea..71b2754ca788 100644
--- a/tools/ocaml/libs/xb/xb.mli
+++ b/tools/ocaml/libs/xb/xb.mli
@@ -66,10 +66,11 @@ type backend_mmap = {
 type backend_fd = { fd : Unix.file_descr; }
 type backend = Fd of backend_fd | Xenmmap of backend_mmap
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
+type capacity = { maxoutstanding: int; maxwatchevents: int }
 type t
 val init_partial_in : unit -> partial_buf
 val reconnect : t -> unit
-val queue : t -> Packet.t -> unit
+val queue : t -> Packet.t -> unit option
 val read_fd : backend_fd -> 'a -> bytes -> int -> int
 val read_mmap : backend_mmap -> 'a -> bytes -> int -> int
 val read : t -> bytes -> int -> int
@@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int
 val write : t -> string -> int -> int
 val output : t -> bool
 val input : t -> Packet.t option
-val newcon : backend -> t
-val open_fd : Unix.file_descr -> t
-val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
+val newcon : capacity:capacity -> backend -> t
+val open_fd : Unix.file_descr -> capacity:capacity -> t
+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t
 val close : t -> unit
 val is_fd : t -> bool
 val is_mmap : t -> bool
 val output_len : t -> int
+val can_input: t -> bool
 val has_new_output : t -> bool
 val has_old_output : t -> bool
 val has_output : t -> bool
@@ -93,3 +95,4 @@ val has_partial_input : t -> bool
 val has_more_input : t -> bool
 val is_selectable : t -> bool
 val get_fd : t -> Unix.file_descr
+val debug: t -> string
diff --git a/tools/ocaml/libs/xs/queueop.ml b/tools/ocaml/libs/xs/queueop.ml
index 9ff5bbd529ce..4e532cdaeacb 100644
--- a/tools/ocaml/libs/xs/queueop.ml
+++ b/tools/ocaml/libs/xs/queueop.ml
@@ -16,9 +16,10 @@
 open Xenbus
 
 let data_concat ls = (String.concat "\000" ls) ^ "\000"
+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None)
 let queue_path ty (tid: int) (path: string) con =
 	let data = data_concat [ path; ] in
-	Xb.queue con (Xb.Packet.create tid 0 ty data)
+	queue con (Xb.Packet.create tid 0 ty data)
 
 (* operations *)
 let directory tid path con = queue_path Xb.Op.Directory tid path con
@@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con
 let getperms tid path con = queue_path Xb.Op.Getperms tid path con
 
 let debug commands con =
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
+	queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
 
 let watch path data con =
 	let data = data_concat [ path; data; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
 
 let unwatch path data con =
 	let data = data_concat [ path; data; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
 
 let transaction_start con =
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
+	queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
 
 let transaction_end tid commit con =
 	let data = data_concat [ (if commit then "T" else "F"); ] in
-	Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
+	queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
 
 let introduce domid mfn port con =
 	let data = data_concat [ Printf.sprintf "%u" domid;
 	                         Printf.sprintf "%nu" mfn;
 	                         string_of_int port; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
 
 let release domid con =
 	let data = data_concat [ Printf.sprintf "%u" domid; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
 
 let resume domid con =
 	let data = data_concat [ Printf.sprintf "%u" domid; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
 
 let getdomainpath domid con =
 	let data = data_concat [ Printf.sprintf "%u" domid; ] in
-	Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
+	queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
 
 let write tid path value con =
 	let data = path ^ "\000" ^ value (* no NULL at the end *) in
-	Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
+	queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
 
 let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con
 let rm tid path con = queue_path Xb.Op.Rm tid path con
 
 let setperms tid path perms con =
 	let data = data_concat [ path; perms ] in
-	Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
+	queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
diff --git a/tools/ocaml/libs/xs/xsraw.ml b/tools/ocaml/libs/xs/xsraw.ml
index 451f8b38dbcc..cbd17280600c 100644
--- a/tools/ocaml/libs/xs/xsraw.ml
+++ b/tools/ocaml/libs/xs/xsraw.ml
@@ -36,8 +36,10 @@ type con = {
 let close con =
 	Xb.close con.xb
 
+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; }
+
 let open_fd fd = {
-	xb = Xb.open_fd fd;
+	xb = Xb.open_fd ~capacity fd;
 	watchevents = Queue.create ();
 }
 
diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml
index cc20e047d2b9..9624a5f9da2c 100644
--- a/tools/ocaml/xenstored/connection.ml
+++ b/tools/ocaml/xenstored/connection.ml
@@ -20,12 +20,84 @@ open Stdext
 
 let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
 
+type 'a bounded_sender = 'a -> unit option
+(** a bounded sender accepts an ['a] item and returns:
+    None - if there is no room to accept the item
+    Some () -  if it has successfully accepted/sent the item
+ *)
+
+module BoundedPipe : sig
+	type 'a t
+
+	(** [create ~capacity ~destination] creates a bounded pipe with a
+	    local buffer holding at most [capacity] items.  Once the buffer is
+	    full it will not accept further items.  items from the pipe are
+	    flushed into [destination] as long as it accepts items.  The
+	    destination could be another pipe.
+	 *)
+	val create: capacity:int -> destination:'a bounded_sender -> 'a t
+
+	(** [is_empty t] returns whether the local buffer of [t] is empty. *)
+	val is_empty : _ t -> bool
+
+	(** [length t] the number of items in the internal buffer *)
+	val length: _ t -> int
+
+	(** [flush_pipe t] sends as many items from the local buffer as possible,
+			which could be none. *)
+	val flush_pipe: _ t -> unit
+
+	(** [push t item] tries to [flush_pipe] and then push [item]
+	    into the pipe if its [capacity] allows.
+	    Returns [None] if there is no more room
+	 *)
+	val push : 'a t -> 'a bounded_sender
+end = struct
+	(* items are enqueued in [q], and then flushed to [connect_to] *)
+	type 'a t =
+		{ q: 'a Queue.t
+		; destination: 'a bounded_sender
+		; capacity: int
+		}
+
+	let create ~capacity ~destination =
+		{ q = Queue.create (); capacity; destination }
+
+	let rec flush_pipe t =
+		if not Queue.(is_empty t.q) then
+			let item = Queue.peek t.q in
+			match t.destination item with
+			| None -> () (* no room *)
+			| Some () ->
+				(* successfully sent item to next stage *)
+				let _ = Queue.pop t.q in
+				(* continue trying to send more items *)
+				flush_pipe t
+
+	let push t item =
+		(* first try to flush as many items from this pipe as possible to make room,
+		   it is important to do this first to preserve the order of the items
+		 *)
+		flush_pipe t;
+		if Queue.length t.q < t.capacity then begin
+			(* enqueue, instead of sending directly.
+			   this ensures that [out] sees the items in the same order as we receive them
+			 *)
+			Queue.push item t.q;
+			Some (flush_pipe t)
+		end else None
+
+	let is_empty t = Queue.is_empty t.q
+	let length t = Queue.length t.q
+end
+
 type watch = {
 	con: t;
 	token: string;
 	path: string;
 	base: string;
 	is_relative: bool;
+	pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
 }
 
 and t = {
@@ -38,8 +110,36 @@ and t = {
 	anonid: int;
 	mutable stat_nb_ops: int;
 	mutable perm: Perms.Connection.t;
+	pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
 }
 
+module Watch = struct
+	module T = struct
+		type t = watch
+
+		let compare w1 w2 =
+			(* cannot compare watches from different connections *)
+			assert (w1.con == w2.con);
+			match String.compare w1.token w2.token with
+			| 0 -> String.compare w1.path w2.path
+			| n -> n
+	end
+	module Set = Set.Make(T)
+
+	let flush_events t =
+		BoundedPipe.flush_pipe t.pending_watchevents;
+		not (BoundedPipe.is_empty t.pending_watchevents)
+
+	let pending_watchevents t =
+		BoundedPipe.length t.pending_watchevents
+end
+
+let source_flush_watchevents t =
+	BoundedPipe.flush_pipe t.pending_source_watchevents
+
+let source_pending_watchevents t =
+	BoundedPipe.length t.pending_source_watchevents
+
 let mark_as_bad con =
 	match con.dom with
 	|None -> ()
@@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = {
 	token = token;
 	path = path;
 	base = get_path con;
-	is_relative = path.[0] <> '/' && path.[0] <> '@'
+	is_relative = path.[0] <> '/' && path.[0] <> '@';
+	pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
 }
 
 let get_con w = w.con
@@ -93,6 +194,9 @@ let make_perm dom =
 	Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
 
 let create xbcon dom =
+	let destination (watch, pkt) =
+		BoundedPipe.push watch.pending_watchevents pkt
+	in
 	let id =
 		match dom with
 		| None -> let old = !anon_id_next in incr anon_id_next; old
@@ -109,6 +213,16 @@ let create xbcon dom =
 	anonid = id;
 	stat_nb_ops = 0;
 	perm = make_perm dom;
+
+	(* the actual capacity will be lower, this is used as an overflow
+	   buffer: anything that doesn't fit elsewhere gets put here, only
+	   limited by the amount of watches that you can generate with a
+	   single xenstore command (which is finite, although possibly very
+	   large in theory for Dom0).  Once the pipe here has any contents the
+	   domain is blocked from sending more commands until it is empty
+	   again though.
+	 *)
+	pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination
 	}
 	in
 	Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
@@ -127,11 +241,17 @@ let set_target con target_domid =
 
 let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
 
-let send_reply con tid rid ty data =
+let packet_of con tid rid ty data =
 	if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then
-		Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000")
+		Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
 	else
-		Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data)
+		Xenbus.Xb.Packet.create tid rid ty data
+
+let send_reply con tid rid ty data =
+	let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
+	(* should never happen: we only process an input packet when there is room for an output packet *)
+	(* and the limit for replies is different from the limit for watch events *)
+	assert (result <> None)
 
 let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000")
 let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
@@ -181,11 +301,11 @@ let del_watch con path token =
 	apath, w
 
 let del_watches con =
-  Hashtbl.clear con.watches;
+  Hashtbl.reset con.watches;
   con.nb_watches <- 0
 
 let del_transactions con =
-  Hashtbl.clear con.transactions
+  Hashtbl.reset con.transactions
 
 let list_watches con =
 	let ll = Hashtbl.fold
@@ -208,21 +328,29 @@ let lookup_watch_perm path = function
 let lookup_watch_perms oldroot root path =
 	lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
 
-let fire_single_watch_unchecked watch =
+let fire_single_watch_unchecked source watch =
 	let data = Utils.join_by_null [watch.path; watch.token; ""] in
-	send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data
+	let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in
+
+	match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with
+	| Some () -> () (* packet queued *)
+	| None ->
+			(* a well behaved Dom0 shouldn't be able to trigger this,
+			   if it happens it is likely a Dom0 bug causing runaway memory usage
+			 *)
+			failwith "watch event overflow, cannot happen"
 
-let fire_single_watch (oldroot, root) watch =
+let fire_single_watch source (oldroot, root) watch =
 	let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in
 	let perms = lookup_watch_perms oldroot root abspath in
 	if Perms.can_fire_watch watch.con.perm perms then
-		fire_single_watch_unchecked watch
+		fire_single_watch_unchecked source watch
 	else
 		let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in
 		let con = get_domstr watch.con in
 		Logging.watch_not_fired ~con perms (Store.Path.to_string abspath)
 
-let fire_watch roots watch path =
+let fire_watch source roots watch path =
 	let new_path =
 		if watch.is_relative && path.[0] = '/'
 		then begin
@@ -232,7 +360,7 @@ let fire_watch roots watch path =
 		end else
 			path
 	in
-	fire_single_watch roots { watch with path = new_path }
+	fire_single_watch source roots { watch with path = new_path }
 
 (* Search for a valid unused transaction id. *)
 let rec valid_transaction_id con proposed_id =
@@ -280,6 +408,7 @@ let do_input con = Xenbus.Xb.input con.xb
 let has_partial_input con = Xenbus.Xb.has_partial_input con.xb
 let has_more_input con = Xenbus.Xb.has_more_input con.xb
 
+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents
 let has_output con = Xenbus.Xb.has_output con.xb
 let has_old_output con = Xenbus.Xb.has_old_output con.xb
 let has_new_output con = Xenbus.Xb.has_new_output con.xb
@@ -323,7 +452,7 @@ let prevents_live_update con = not (is_bad con)
 	&& (has_extra_connection_data con || has_transaction_data con)
 
 let has_more_work con =
-	has_more_input con || not (has_old_output con) && has_new_output con
+	(has_more_input con && can_input con) || not (has_old_output con) && has_new_output con
 
 let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
 
diff --git a/tools/ocaml/xenstored/connections.ml b/tools/ocaml/xenstored/connections.ml
index 3c7429fe7f61..7d68c583b43a 100644
--- a/tools/ocaml/xenstored/connections.ml
+++ b/tools/ocaml/xenstored/connections.ml
@@ -22,22 +22,30 @@ type t = {
 	domains: (int, Connection.t) Hashtbl.t;
 	ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
 	mutable watches: Connection.watch list Trie.t;
+	mutable has_pending_watchevents: Connection.Watch.Set.t
 }
 
 let create () = {
 	anonymous = Hashtbl.create 37;
 	domains = Hashtbl.create 37;
 	ports = Hashtbl.create 37;
-	watches = Trie.create ()
+	watches = Trie.create ();
+	has_pending_watchevents = Connection.Watch.Set.empty;
 }
 
+let get_capacity () =
+	(* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
+	{ Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents }
+
 let add_anonymous cons fd =
-	let xbcon = Xenbus.Xb.open_fd fd in
+	let capacity = get_capacity () in
+	let xbcon = Xenbus.Xb.open_fd fd ~capacity in
 	let con = Connection.create xbcon None in
 	Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
 
 let add_domain cons dom =
-	let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in
+	let capacity = get_capacity () in
+	let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in
 	let con = Connection.create xbcon (Some dom) in
 	Hashtbl.add cons.domains (Domain.get_id dom) con;
 	match Domain.get_port dom with
@@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons =
 	Hashtbl.fold (fun _ con (ins, outs) ->
 		if (only_if con) then (
 			let fd = Connection.get_fd con in
-			(fd :: ins,  if Connection.has_output con then fd :: outs else outs)
+			let in_fds = if Connection.can_input con then fd :: ins else ins in
+			let out_fds = if Connection.has_output con then fd :: outs else outs in
+			in_fds, out_fds
 		) else (ins, outs)
 	)
 	cons.anonymous ([], [])
@@ -67,10 +77,17 @@ let del_watches_of_con con watches =
 	| [] -> None
 	| ws -> Some ws
 
+let del_watches cons con =
+	Connection.del_watches con;
+	cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+	cons.has_pending_watchevents <-
+		cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w ->
+		Connection.get_con w != con
+
 let del_anonymous cons con =
 	try
 		Hashtbl.remove cons.anonymous (Connection.get_fd con);
-		cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+		del_watches cons con;
 		Connection.close con
 	with exn ->
 		debug "del anonymous %s" (Printexc.to_string exn)
@@ -85,7 +102,7 @@ let del_domain cons id =
 		    | Some p -> Hashtbl.remove cons.ports p
 		    | None -> ())
 		 | None -> ());
-		cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+		del_watches cons con;
 		Connection.close con
 	with exn ->
 		debug "del domain %u: %s" id (Printexc.to_string exn)
@@ -136,31 +153,33 @@ let del_watch cons con path token =
 		cons.watches <- Trie.set cons.watches key watches;
  	watch
 
-let del_watches cons con =
-	Connection.del_watches con;
-	cons.watches <- Trie.map (del_watches_of_con con) cons.watches
-
 (* path is absolute *)
-let fire_watches ?oldroot root cons path recurse =
+let fire_watches ?oldroot source root cons path recurse =
 	let key = key_of_path path in
 	let path = Store.Path.to_string path in
 	let roots = oldroot, root in
 	let fire_watch _ = function
 		| None         -> ()
-		| Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches
+		| Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches
 	in
 	let fire_rec _x = function
 		| None         -> ()
 		| Some watches ->
-			List.iter (Connection.fire_single_watch roots) watches
+			List.iter (Connection.fire_single_watch source roots) watches
 	in
 	Trie.iter_path fire_watch cons.watches key;
 	if recurse then
 		Trie.iter fire_rec (Trie.sub cons.watches key)
 
+let send_watchevents cons con =
+	cons.has_pending_watchevents <-
+		cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events;
+	Connection.source_flush_watchevents con
+
 let fire_spec_watches root cons specpath =
+	let source = find_domain cons 0 in
 	iter cons (fun con ->
-		List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath))
+		List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath))
 
 let set_target cons domain target_domain =
 	let con = find_domain cons domain in
@@ -197,6 +216,16 @@ let debug cons =
 	let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in
 	String.concat "" (domains @ anonymous)
 
+let debug_watchevents cons con =
+	(* == (physical equality)
+	   has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular
+	   comparison to fail due to having a 'functional value' which cannot be compared.
+	 *)
+	let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in
+	let pending = s |> Connection.Watch.Set.elements
+		|> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in
+	Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending
+
 let filter ~f cons =
 	let fold _ v acc = if f v then v :: acc else acc in
 	[]
diff --git a/tools/ocaml/xenstored/define.ml b/tools/ocaml/xenstored/define.ml
index ba63a8147e09..327b6d795ec7 100644
--- a/tools/ocaml/xenstored/define.ml
+++ b/tools/ocaml/xenstored/define.ml
@@ -24,6 +24,13 @@ let default_config_dir = Paths.xen_config_dir
 let maxwatch = ref (100)
 let maxtransaction = ref (10)
 let maxrequests = ref (1024)   (* maximum requests per transaction *)
+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *)
+let maxwatchevents = ref (1024)
+(*
+	maximum outstanding watch events per watch,
+	recommended >= maxoutstanding to avoid blocking backend transactions due to
+	malicious frontends
+ *)
 
 let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *)
 let conflict_burst_limit = ref 5.0
diff --git a/tools/ocaml/xenstored/oxenstored.conf.in b/tools/ocaml/xenstored/oxenstored.conf.in
index 4ae48e42d47d..9d034e744b4b 100644
--- a/tools/ocaml/xenstored/oxenstored.conf.in
+++ b/tools/ocaml/xenstored/oxenstored.conf.in
@@ -62,6 +62,8 @@ quota-maxwatch = 100
 quota-transaction = 10
 quota-maxrequests = 1024
 quota-path-max = 1024
+quota-maxoutstanding = 1024
+quota-maxwatchevents = 1024
 
 # Activate filed base backend
 persistent = false
diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml
index 2d67456a2aa0..6dcedfda86e4 100644
--- a/tools/ocaml/xenstored/process.ml
+++ b/tools/ocaml/xenstored/process.ml
@@ -57,7 +57,7 @@ let split_one_path data con =
 	| path :: "" :: [] -> Store.Path.create path (Connection.get_path con)
 	| _                -> raise Invalid_Cmd_Args
 
-let process_watch t cons =
+let process_watch source t cons =
 	let oldroot = t.Transaction.oldroot in
 	let newroot = Store.get_root t.store in
 	let ops = Transaction.get_paths t |> List.rev in
@@ -67,8 +67,9 @@ let process_watch t cons =
 		| Xenbus.Xb.Op.Rm       -> true, None, oldroot
 		| Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot
 		| _              -> raise (Failure "huh ?") in
-		Connections.fire_watches ?oldroot root cons (snd op) recurse in
-	List.iter (fun op -> do_op_watch op cons) ops
+		Connections.fire_watches ?oldroot source root cons (snd op) recurse in
+	List.iter (fun op -> do_op_watch op cons) ops;
+	Connections.send_watchevents cons source
 
 let create_implicit_path t perm path =
 	let dirname = Store.Path.get_parent path in
@@ -234,6 +235,20 @@ let do_debug con t _domains cons data =
 	| "watches" :: _ ->
 		let watches = Connections.debug cons in
 		Some (watches ^ "\000")
+	| "xenbus" :: domid :: _ ->
+		let domid = int_of_string domid in
+		let con = Connections.find_domain cons domid in
+		let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s"
+			(Xenbus.Xb.debug con.xb)
+			(Connection.source_pending_watchevents con)
+			(Connection.can_input con)
+			(Connection.has_more_input con)
+			(Connection.has_old_output con)
+			(Connection.has_new_output con)
+			(Connection.has_more_work con)
+			(Connections.debug_watchevents cons con)
+		in
+		Some s
 	| "mfn" :: domid :: _ ->
 		let domid = int_of_string domid in
 		let con = Connections.find_domain cons domid in
@@ -342,7 +357,7 @@ let reply_ack fct con t doms cons data =
 	fct con t doms cons data;
 	Packet.Ack (fun () ->
 		if Transaction.get_id t = Transaction.none then
-			process_watch t cons
+			process_watch con t cons
 	)
 
 let reply_data fct con t doms cons data =
@@ -501,7 +516,7 @@ let do_watch con t _domains cons data =
 	Packet.Ack (fun () ->
 		(* xenstore.txt says this watch is fired immediately,
 		   implying even if path doesn't exist or is unreadable *)
-		Connection.fire_single_watch_unchecked watch)
+		Connection.fire_single_watch_unchecked con watch)
 
 let do_unwatch con _t _domains cons data =
 	let (node, token) =
@@ -532,7 +547,7 @@ let do_transaction_end con t domains cons data =
 	if not success then
 		raise Transaction_again;
 	if commit then begin
-		process_watch t cons;
+		process_watch con t cons;
 		match t.Transaction.ty with
 		| Transaction.No ->
 			() (* no need to record anything *)
@@ -699,7 +714,8 @@ let process_packet ~store ~cons ~doms ~con ~req =
 let do_input store cons doms con =
 	let newpacket =
 		try
-			Connection.do_input con
+			if Connection.can_input con then Connection.do_input con
+			else None
 		with Xenbus.Xb.Reconnect ->
 			info "%s requests a reconnect" (Connection.get_domstr con);
 			History.reconnect con;
@@ -727,6 +743,7 @@ let do_input store cons doms con =
 		Connection.incr_ops con
 
 let do_output _store _cons _doms con =
+	Connection.source_flush_watchevents con;
 	if Connection.has_output con then (
 		if Connection.has_new_output con then (
 			let packet = Connection.peek_output con in
diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml
index 3b57ad016dfb..c799e20f1145 100644
--- a/tools/ocaml/xenstored/xenstored.ml
+++ b/tools/ocaml/xenstored/xenstored.ml
@@ -103,6 +103,8 @@ let parse_config filename =
 		("quota-maxentity", Config.Set_int Quota.maxent);
 		("quota-maxsize", Config.Set_int Quota.maxsize);
 		("quota-maxrequests", Config.Set_int Define.maxrequests);
+		("quota-maxoutstanding", Config.Set_int Define.maxoutstanding);
+		("quota-maxwatchevents", Config.Set_int Define.maxwatchevents);
 		("quota-path-max", Config.Set_int Define.path_max);
 		("gc-max-overhead", Config.Set_int Define.gc_max_overhead);
 		("test-eagain", Config.Set_bool Transaction.test_eagain);