From 8dc882fe86e4235582b782d9198491b488d5645b Mon Sep 17 00:00:00 2001 From: Irina Boverman Date: Mar 19 2021 19:53:39 +0000 Subject: 0.15.0-2 --- diff --git a/dispatch.patch b/dispatch.patch new file mode 100644 index 0000000..c03cc3c --- /dev/null +++ b/dispatch.patch @@ -0,0 +1,2998 @@ +From b07d0de0f6c8d115047c6ceca453ca495b4f483f Mon Sep 17 00:00:00 2001 +From: Ganesh Murthy +Date: Tue, 9 Feb 2021 17:07:47 -0500 +Subject: [PATCH 02/16] DISPATCH-1909: Requests are now recorded just before + freeing streams. Also modified test to account for no stats being present. + This closes #1024 + +--- + src/adaptors/http2/http2_adaptor.c | 82 ++++++++++++++---------------- + tests/system_tests_http2.py | 33 +++++++----- + 2 files changed, 57 insertions(+), 58 deletions(-) + +diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c +index a08328c7..2330ece8 100644 +--- a/src/adaptors/http2/http2_adaptor.c ++++ b/src/adaptors/http2/http2_adaptor.c +@@ -68,6 +68,7 @@ typedef struct qdr_http2_adaptor_t { + static qdr_http2_adaptor_t *http2_adaptor; + + static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context); ++static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data); + + static void set_stream_data_delivery_flags(qdr_http2_stream_data_t * stream_data, qdr_delivery_t *dlv) { + if (dlv == stream_data->in_dlv) { +@@ -284,6 +285,10 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on + + qdr_http2_session_data_t *session_data = stream_data->session_data; + qdr_http2_connection_t *conn = session_data->conn; ++ ++ // Record the request just before freeing the stream. ++ _http_record_request(conn, stream_data); ++ + if (!on_shutdown) { + if (conn->qdr_conn && stream_data->in_link) { + qdr_link_set_context(stream_data->in_link, 0); +@@ -339,6 +344,14 @@ static char *get_address_string(pn_raw_connection_t *pn_raw_conn) + + void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdown) + { ++ // Free all the stream data associated with this connection/session. ++ qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams); ++ while (stream_data) { ++ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id); ++ free_http2_stream_data(stream_data, on_shutdown); ++ stream_data = DEQ_HEAD(http_conn->session_data->streams); ++ } ++ + if(http_conn->remote_address) { + free(http_conn->remote_address); + http_conn->remote_address = 0; +@@ -353,15 +366,6 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo + http_conn->ping_timer = 0; + } + +- // Free all the stream data associated with this connection/session. +- qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams); +- while (stream_data) { +- DEQ_REMOVE_HEAD(http_conn->session_data->streams); +- qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id); +- free_http2_stream_data(stream_data, on_shutdown); +- stream_data = DEQ_HEAD(http_conn->session_data->streams); +- } +- + http_conn->context.context = 0; + + nghttp2_session_del(http_conn->session_data->session); +@@ -378,6 +382,10 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo + buff = DEQ_HEAD(http_conn->granted_read_buffs); + } + ++ if (http_conn->delete_egress_connections) { ++ http_conn->config = 0; ++ } ++ + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Freeing http2 connection in free_qdr_http2_connection", http_conn->conn_id); + + free_qdr_http2_connection_t(http_conn); +@@ -864,9 +872,13 @@ static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_ + } else { + remote_addr = conn->config->host; + } +- qd_http_record_request(http2_adaptor->core, stream_data->method, stream_data->request_status, +- conn->config->address, remote_addr, conn->config->site, +- stream_data->remote_site, conn->ingress, stream_data->bytes_in, stream_data->bytes_out, ++ qd_http_record_request(http2_adaptor->core, ++ stream_data->method, ++ stream_data->request_status, ++ conn->config->address, ++ remote_addr, conn->config->site, ++ stream_data->remote_site, ++ conn->ingress, stream_data->bytes_in, stream_data->bytes_out, + stream_data->stop && stream_data->start ? stream_data->stop - stream_data->start : 0); + if (free_remote_addr) { + free(remote_addr); +@@ -912,10 +924,6 @@ static int on_frame_recv_callback(nghttp2_session *session, + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, setting receive_complete = true", conn->conn_id, stream_id); + } + advance_stream_status(stream_data); +- +- if (!conn->ingress) { +- _http_record_request(conn, stream_data); +- } + } + + if (stream_data->in_dlv && !stream_data->stream_force_closed) { +@@ -979,9 +987,6 @@ static int on_frame_recv_callback(nghttp2_session *session, + qd_message_set_receive_complete(stream_data->message); + advance_stream_status(stream_data); + receive_complete = true; +- if (!conn->ingress) { +- _http_record_request(conn, stream_data); +- } + } + + if (stream_data->entire_footer_arrived) { +@@ -1095,9 +1100,6 @@ ssize_t read_data_callback(nghttp2_session *session, + stream_data->next_stream_data = 0; + } + stream_data->out_dlv_local_disposition = PN_ACCEPTED; +- if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && conn->ingress) { +- _http_record_request(conn, stream_data); +- } + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_stream_data=QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); + } + else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) { +@@ -1166,10 +1168,6 @@ ssize_t read_data_callback(nghttp2_session *session, + } + + stream_data->bytes_out += bytes_to_send; +- +- if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && conn->ingress) { +- _http_record_request(conn, stream_data); +- } + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback returning bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send); + return bytes_to_send; + } +@@ -1251,9 +1249,6 @@ ssize_t read_data_callback(nghttp2_session *session, + return NGHTTP2_ERR_DEFERRED; + } + +- if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && conn->ingress) { +- _http_record_request(conn, stream_data); +- } + qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback Returning zero", conn->conn_id, stream_data->stream_id); + return 0; + } +@@ -2055,8 +2050,6 @@ static void restart_streams(qdr_http2_connection_t *http_conn) + static void qdr_del_http2_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard) + { + qdr_http2_connection_t *conn = (qdr_http2_connection_t*) action->args.general.context_1; +- if (conn->config) +- qd_log(http2_adaptor->log_source, QD_LOG_DEBUG, "Removed http2 connection %s", conn->config->host_port); + free_qdr_http2_connection(conn, false); + } + +@@ -2097,7 +2090,6 @@ static void handle_disconnected(qdr_http2_connection_t* conn) + conn->stream_dispatcher_stream_data = 0; + + if (conn->delete_egress_connections) { +- conn->config = 0; + close_connections(conn); + } + } +@@ -2445,6 +2437,19 @@ static void qdr_http2_adaptor_final(void *adaptor_context) + qdr_http2_adaptor_t *adaptor = (qdr_http2_adaptor_t*) adaptor_context; + qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); + ++ // Free all remaining connections. ++ qdr_http2_connection_t *http_conn = DEQ_HEAD(adaptor->connections); ++ while (http_conn) { ++ if (http_conn->stream_dispatcher_stream_data) { ++ qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Freeing stream_data (stream_dispatcher, qdr_http2_adaptor_final) (%lx)", http_conn->conn_id, (long) http_conn->stream_dispatcher_stream_data); ++ free_qdr_http2_stream_data_t(http_conn->stream_dispatcher_stream_data); ++ http_conn->stream_dispatcher_stream_data = 0; ++ } ++ qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Freeing http2 connection (calling free_qdr_http2_connection)", http_conn->conn_id); ++ free_qdr_http2_connection(http_conn, true); ++ http_conn = DEQ_HEAD(adaptor->connections); ++ } ++ + // Free all http listeners + qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners); + while (li) { +@@ -2459,19 +2464,6 @@ static void qdr_http2_adaptor_final(void *adaptor_context) + ct = DEQ_HEAD(adaptor->connectors); + } + +- // Free all remaining connections. +- qdr_http2_connection_t *http_conn = DEQ_HEAD(adaptor->connections); +- while (http_conn) { +- if (http_conn->stream_dispatcher_stream_data) { +- qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Freeing stream_data (stream_dispatcher, qdr_http2_adaptor_final) (%lx)", http_conn->conn_id, (long) http_conn->stream_dispatcher_stream_data); +- free_qdr_http2_stream_data_t(http_conn->stream_dispatcher_stream_data); +- http_conn->stream_dispatcher_stream_data = 0; +- } +- qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Freeing http2 connection (calling free_qdr_http2_connection)", http_conn->conn_id); +- free_qdr_http2_connection(http_conn, true); +- http_conn = DEQ_HEAD(adaptor->connections); +- } +- + sys_mutex_free(adaptor->lock); + nghttp2_session_callbacks_del(adaptor->callbacks); + http2_adaptor = NULL; +diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py +index fb9d6716..59a7a23a 100644 +--- a/tests/system_tests_http2.py ++++ b/tests/system_tests_http2.py +@@ -281,7 +281,7 @@ class Http2TestOneStandaloneRouter(Http2TestBase, CommonHttp2Tests): + listen_port=int(os.getenv('SERVER_LISTEN_PORT')), + py_string='python3', + server_file="http2_server.py") +- name = "http2-test-router" ++ name = "http2-test-standalone-router" + cls.connector_name = 'connectorToBeDeleted' + cls.connector_props = { + 'port': os.getenv('SERVER_LISTEN_PORT'), +@@ -310,20 +310,24 @@ class Http2TestOneStandaloneRouter(Http2TestBase, CommonHttp2Tests): + def test_000_stats(self): + # Run curl 127.0.0.1:port --http2-prior-knowledge + address = self.router_qdra.http_addresses[0] +- self.run_curl(address=address) ++ qd_manager = QdManager(self, address=self.router_qdra.addresses[0]) ++ ++ # First request ++ out = self.run_curl(address=address) ++ ++ # Second request + address = self.router_qdra.http_addresses[0] + "/myinfo" + out = self.run_curl(args=['-d', 'fname=Mickey&lname=Mouse', '-X', 'POST'], address=address) + self.assertIn('Success! Your first name is Mickey, last name is Mouse', out) +- qd_manager = QdManager(self, address=self.router_qdra.addresses[0]) ++ + stats = qd_manager.query('org.apache.qpid.dispatch.httpRequestInfo') + self.assertEqual(len(stats), 2) + + # Give time for the core thread to augment the stats. + i = 0 + while i < 3: +- s = stats[0] +- i += 1 +- if s.get('requests') < 2: ++ if not stats or stats[0].get('requests') < 2: ++ i += 1 + sleep(1) + stats = qd_manager.query('org.apache.qpid.dispatch.httpRequestInfo') + else: +@@ -475,20 +479,23 @@ class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests): + def test_000_stats(self): + # Run curl 127.0.0.1:port --http2-prior-knowledge + address = self.router_qdra.http_addresses[0] ++ qd_manager_a = QdManager(self, address=self.router_qdra.addresses[0]) ++ stats_a = qd_manager_a.query('org.apache.qpid.dispatch.httpRequestInfo') ++ ++ # First request + self.run_curl(address=address) + address = self.router_qdra.http_addresses[0] + "/myinfo" ++ ++ # Second request + out = self.run_curl(args=['-d', 'fname=Mickey&lname=Mouse', '-X', 'POST'], address=address) + self.assertIn('Success! Your first name is Mickey, last name is Mouse', out) +- qd_manager_a = QdManager(self, address=self.router_qdra.addresses[0]) +- stats_a = qd_manager_a.query('org.apache.qpid.dispatch.httpRequestInfo') + + # Give time for the core thread to augment the stats. + i = 0 + while i < 3: +- s = stats_a[0] +- i += 1 +- if s.get('requests') < 2: ++ if not stats_a or stats_a[0].get('requests') < 2: + sleep(1) ++ i += 1 + stats_a = qd_manager_a.query('org.apache.qpid.dispatch.httpRequestInfo') + else: + break +@@ -505,8 +512,8 @@ class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests): + i = 0 + while i < 3: + s = stats_b[0] +- i += 1 +- if s.get('requests') < 2: ++ if not stats_b or stats_b[0].get('requests') < 2: ++ i += 1 + sleep(1) + stats_b = qd_manager_b.query('org.apache.qpid.dispatch.httpRequestInfo') + else: +-- +2.20.1 + +From cb97f9c84ddaadece8ec3495e18c6b3a0c35ec3b Mon Sep 17 00:00:00 2001 +From: Fernando Giorgetti +Date: Fri, 12 Feb 2021 09:17:06 -0300 +Subject: [PATCH 03/16] DISPATCH-1586 - Add allocator metrics (#696) + +--- + src/http-libwebsockets.c | 140 +++++++++++++++++++++++++++++++++++++-- + 1 file changed, 136 insertions(+), 4 deletions(-) + +diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c +index 9a2e803e..66cf1874 100644 +--- a/src/http-libwebsockets.c ++++ b/src/http-libwebsockets.c +@@ -460,6 +460,11 @@ typedef struct metric_definition { + int_metric value; + } metric_definition; + ++typedef struct allocator_metric_definition { ++ const char* name; ++ qd_alloc_stats_t *(*fn)(void); ++} allocator_metric_definition; ++ + static int stats_get_connections(qdr_global_stats_t *stats) { return stats->connections; } + static int stats_get_links(qdr_global_stats_t *stats) { return stats->links; } + static int stats_get_addrs(qdr_global_stats_t *stats) { return stats->addrs; } +@@ -483,6 +488,43 @@ static int stats_get_deliveries_stuck(qdr_global_stats_t *stats) { return stats- + static int stats_get_links_blocked(qdr_global_stats_t *stats) { return stats->links_blocked; } + static int stats_get_deliveries_redirected_to_fallback(qdr_global_stats_t *stats) { return stats->deliveries_redirected_to_fallback; } + ++qd_alloc_stats_t *alloc_stats_qd_bitmask_t(void); ++qd_alloc_stats_t *alloc_stats_qd_buffer_t(void); ++qd_alloc_stats_t *alloc_stats_qd_composed_field_t(void); ++qd_alloc_stats_t *alloc_stats_qd_composite_t(void); ++qd_alloc_stats_t *alloc_stats_qd_connection_t(void); ++qd_alloc_stats_t *alloc_stats_qd_hash_handle_t(void); ++qd_alloc_stats_t *alloc_stats_qd_hash_item_t(void); ++qd_alloc_stats_t *alloc_stats_qd_iterator_t(void); ++qd_alloc_stats_t *alloc_stats_qd_link_ref_t(void); ++qd_alloc_stats_t *alloc_stats_qd_link_t(void); ++qd_alloc_stats_t *alloc_stats_qd_listener_t(void); ++qd_alloc_stats_t *alloc_stats_qd_log_entry_t(void); ++qd_alloc_stats_t *alloc_stats_qd_management_context_t(void); ++qd_alloc_stats_t *alloc_stats_qd_message_content_t(void); ++qd_alloc_stats_t *alloc_stats_qd_message_t(void); ++qd_alloc_stats_t *alloc_stats_qd_node_t(void); ++qd_alloc_stats_t *alloc_stats_qd_parse_node_t(void); ++qd_alloc_stats_t *alloc_stats_qd_parsed_field_t(void); ++qd_alloc_stats_t *alloc_stats_qd_timer_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_action_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_address_config_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_address_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_connection_info_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_connection_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_connection_work_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_core_timer_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_delivery_cleanup_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_delivery_ref_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_delivery_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_field_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_general_work_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_link_ref_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_link_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_link_work_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_query_t(void); ++qd_alloc_stats_t *alloc_stats_qdr_terminus_t(void); ++ + static struct metric_definition metrics[] = { + {"qdr_connections_total", "gauge", stats_get_connections}, + {"qdr_links_total", "gauge", stats_get_links}, +@@ -509,9 +551,60 @@ static struct metric_definition metrics[] = { + }; + static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]); + ++static struct allocator_metric_definition allocator_metrics[] = { ++ {"qdr_allocator_qd_bitmask_t", alloc_stats_qd_bitmask_t}, ++ {"qdr_allocator_qd_buffer_t", alloc_stats_qd_buffer_t}, ++ {"qdr_allocator_qd_composed_field_t", alloc_stats_qd_composed_field_t}, ++ {"qdr_allocator_qd_composite_t", alloc_stats_qd_composite_t}, ++ {"qdr_allocator_qd_connection_t", alloc_stats_qd_connection_t}, ++ {"qdr_allocator_qd_hash_handle_t", alloc_stats_qd_hash_handle_t}, ++ {"qdr_allocator_qd_hash_item_t", alloc_stats_qd_hash_item_t}, ++ {"qdr_allocator_qd_iterator_t", alloc_stats_qd_iterator_t}, ++ {"qdr_allocator_qd_link_ref_t", alloc_stats_qd_link_ref_t}, ++ {"qdr_allocator_qd_link_t", alloc_stats_qd_link_t}, ++ {"qdr_allocator_qd_listener_t", alloc_stats_qd_listener_t}, ++ {"qdr_allocator_qd_log_entry_t", alloc_stats_qd_log_entry_t}, ++ {"qdr_allocator_qd_management_context_t", alloc_stats_qd_management_context_t}, ++ {"qdr_allocator_qd_message_content_t", alloc_stats_qd_message_content_t}, ++ {"qdr_allocator_qd_message_t", alloc_stats_qd_message_t}, ++ {"qdr_allocator_qd_node_t", alloc_stats_qd_node_t}, ++ {"qdr_allocator_qd_parse_node_t", alloc_stats_qd_parse_node_t}, ++ {"qdr_allocator_qd_parsed_field_t", alloc_stats_qd_parsed_field_t}, ++ {"qdr_allocator_qd_timer_t", alloc_stats_qd_timer_t}, ++ {"qdr_allocator_qdr_action_t", alloc_stats_qdr_action_t}, ++ {"qdr_allocator_qdr_address_config_t", alloc_stats_qdr_address_config_t}, ++ {"qdr_allocator_qdr_address_t", alloc_stats_qdr_address_t}, ++ {"qdr_allocator_qdr_connection_info_t", alloc_stats_qdr_connection_info_t}, ++ {"qdr_allocator_qdr_connection_t", alloc_stats_qdr_connection_t}, ++ {"qdr_allocator_qdr_connection_work_t", alloc_stats_qdr_connection_work_t}, ++ {"qdr_allocator_qdr_core_timer_t", alloc_stats_qdr_core_timer_t}, ++ {"qdr_allocator_qdr_delivery_cleanup_t", alloc_stats_qdr_delivery_cleanup_t}, ++ {"qdr_allocator_qdr_delivery_ref_t", alloc_stats_qdr_delivery_ref_t}, ++ {"qdr_allocator_qdr_delivery_t", alloc_stats_qdr_delivery_t}, ++ {"qdr_allocator_qdr_field_t", alloc_stats_qdr_field_t}, ++ {"qdr_allocator_qdr_general_work_t", alloc_stats_qdr_general_work_t}, ++ {"qdr_allocator_qdr_link_ref_t", alloc_stats_qdr_link_ref_t}, ++ {"qdr_allocator_qdr_link_t", alloc_stats_qdr_link_t}, ++ {"qdr_allocator_qdr_link_work_t", alloc_stats_qdr_link_work_t}, ++ {"qdr_allocator_qdr_query_t", alloc_stats_qdr_query_t}, ++ {"qdr_allocator_qdr_terminus_t", alloc_stats_qdr_terminus_t} ++}; ++static size_t allocator_metrics_length = sizeof(allocator_metrics)/sizeof(allocator_metrics[0]); ++ ++#define ALLOC_DATA(S, F) ((allocator_field) {#F, (S!=NULL? S->F: 0)}) ++ ++typedef struct allocator_field { ++ const char* name; ++ uint64_t value; ++} allocator_field; ++ + static bool write_stats(uint8_t **position, const uint8_t * const end, const char* name, const char* type, int value) + { + //11 chars + type + 2*name + 20 chars for int ++ // average metric name size is 30 bytes ++ // average metric type size is 8 bytes ++ // current number of metrics is 22 ++ // total metric buffer size = 22 * (11 + 8 + 2*30 + 20) = 2178 + size_t length = 11 + strlen(type) + strlen(name)*2 + 20; + if (end - *position >= length) { + *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s %s\n", name, type); +@@ -522,11 +615,38 @@ static bool write_stats(uint8_t **position, const uint8_t * const end, const cha + } + } + ++static bool write_allocator_stats(uint8_t **position, const uint8_t * const end, const char* name, allocator_field field) ++{ ++ // 30 chars (static) + 2*name + 2*field.name + 20 for int ++ // average allocator metric name size is 54 bytes (name:field.name) ++ // current number of metrics is 180 ++ // total allocator buffer size = 180 * (30 + 2*54 + 20) = 28440 ++ size_t length = 30 + strlen(name)*2 + strlen(field.name)*2 + 20; ++ if (end - *position >= length) { ++ *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s:%s_bytes gauge\n", name, field.name); ++ *position += lws_snprintf((char*) *position, end - *position, "%s:%s_bytes %"PRIu64"\n", name, field.name, field.value); ++ return true; ++ } else { ++ return false; ++ } ++} ++ + static bool write_metric(uint8_t **position, const uint8_t * const end, metric_definition* definition, qdr_global_stats_t* stats) + { + return write_stats(position, end, definition->name, definition->type, definition->value(stats)); + } + ++static bool write_allocator_metric(uint8_t **position, const uint8_t * const end, allocator_metric_definition* definition) ++{ ++ qd_alloc_stats_t *allocator_stats = definition->fn(); ++ if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, total_alloc_from_heap))) return false; ++ if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, total_free_to_heap))) return false; ++ if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, held_by_threads))) return false; ++ if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, batches_rebalanced_to_threads))) return false; ++ if (!write_allocator_stats(position, end, definition->name, ALLOC_DATA(allocator_stats, batches_rebalanced_to_global))) return false; ++ return true; ++} ++ + static int add_header_by_name(struct lws *wsi, const char* name, const char* value, uint8_t** position, uint8_t* end) + { + return lws_add_http_header_by_name(wsi, (unsigned char*) name, (unsigned char*) value, strlen(value), position, end); +@@ -537,7 +657,8 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason, + { + qd_http_server_t *hs = wsi_server(wsi); + stats_t *stats = (stats_t*) user; +- uint8_t buffer[LWS_PRE + 2048]; ++ // rationale for buffer size is explained at write_stats and write_allocator_stats ++ uint8_t buffer[LWS_PRE + 30618]; + uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1]; + + switch (reason) { +@@ -569,15 +690,26 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason, + stats->current++; + qd_log(hs->log, QD_LOG_DEBUG, "wrote metric %lu of %lu", stats->current, metrics_length); + } else { +- qd_log(hs->log, QD_LOG_DEBUG, "insufficient space in buffer"); ++ qd_log(hs->log, QD_LOG_WARNING, "insufficient space in buffer"); + break; + } + } +- int n = stats->current < metrics_length ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL; ++ ++ int alloc_cur = 0; ++ while (alloc_cur < allocator_metrics_length) { ++ if (write_allocator_metric(&position, end, &allocator_metrics[alloc_cur])) { ++ qd_log(hs->log, QD_LOG_DEBUG, "wrote allocator metric %lu of %lu", alloc_cur, allocator_metrics_length); ++ alloc_cur++; ++ } else { ++ qd_log(hs->log, QD_LOG_WARNING, "insufficient space in buffer"); ++ break; ++ } ++ } ++ int n = (stats->current < metrics_length) || (alloc_cur < allocator_metrics_length) ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL; + + //write buffer + size_t available = position - start; +- if (lws_write(wsi, (unsigned char*) start, available, n) != available) ++ if (lws_write(wsi, (unsigned char*) start, available, n) != available) + return 1; + if (n == LWS_WRITE_HTTP_FINAL) { + if (lws_http_transaction_completed(wsi)) return -1; +-- +2.20.1 + +From 750f86f10fa19b8e0f939d2e188e86218355ece4 Mon Sep 17 00:00:00 2001 +From: Kenneth Giusti +Date: Tue, 2 Feb 2021 16:52:51 -0500 +Subject: [PATCH 04/16] DISPATCH-1948: fix alignment issues in alloc_pool.c + +This closes #1015 +--- + src/alloc_pool.c | 10 +++++++--- + 1 file changed, 7 insertions(+), 3 deletions(-) + +diff --git a/src/alloc_pool.c b/src/alloc_pool.c +index 0feb20e7..284630ec 100644 +--- a/src/alloc_pool.c ++++ b/src/alloc_pool.c +@@ -348,7 +348,8 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool) + DEQ_INSERT_TAIL(qtype->allocated, item); + sys_mutex_unlock(desc->lock); + item->header = PATTERN_FRONT; +- *((uint32_t*) ((char*) &item[1] + desc->total_size))= PATTERN_BACK; ++ const uint32_t pb = PATTERN_BACK; ++ memcpy((char*) &item[1] + desc->total_size, &pb, sizeof(pb)); + QD_MEMORY_FILL(&item[1], QD_MEMORY_INIT, desc->total_size); + #endif + return &item[1]; +@@ -414,7 +415,8 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool) + DEQ_INSERT_TAIL(qtype->allocated, item); + sys_mutex_unlock(desc->lock); + item->header = PATTERN_FRONT; +- *((uint32_t*) ((char*) &item[1] + desc->total_size))= PATTERN_BACK; ++ const uint32_t pb = PATTERN_BACK; ++ memcpy((char*) &item[1] + desc->total_size, &pb, sizeof(pb)); + QD_MEMORY_FILL(&item[1], QD_MEMORY_INIT, desc->total_size); + #endif + return &item[1]; +@@ -434,7 +436,9 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p) + assert (desc->header == PATTERN_FRONT); + assert (desc->trailer == PATTERN_BACK); + assert (item->header == PATTERN_FRONT); +- assert (*((uint32_t*) (p + desc->total_size)) == PATTERN_BACK); ++ const uint32_t pb = PATTERN_BACK; ++ (void)pb; // prevent unused warning ++ assert (memcmp(p + desc->total_size, &pb, sizeof(pb)) == 0); + assert (item->desc == desc); // Check for double-free + qd_alloc_type_t *qtype = (qd_alloc_type_t*) desc->debug; + sys_mutex_lock(desc->lock); +-- +2.20.1 + +From b1e309e1c59f50a67fdb6d5c2bd0ff54a64680bb Mon Sep 17 00:00:00 2001 +From: Kenneth Giusti +Date: Mon, 8 Feb 2021 13:25:35 -0500 +Subject: [PATCH 05/16] DISPATCH-1941: fix http1 parser to detect null + characters + +--- + src/adaptors/http1/http1_codec.c | 18 ++++++++++++++---- + src/adaptors/http1/http1_server.c | 4 ++-- + 2 files changed, 16 insertions(+), 6 deletions(-) + +diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c +index 7669d26f..a11f2260 100644 +--- a/src/adaptors/http1/http1_codec.c ++++ b/src/adaptors/http1/http1_codec.c +@@ -353,6 +353,8 @@ static void ensure_outgoing_capacity(struct encoder_t *encoder, size_t capacity) + static void write_string(struct encoder_t *encoder, const char *string) + { + size_t needed = strlen(string); ++ if (needed == 0) return; ++ + ensure_outgoing_capacity(encoder, needed); + + encoder->hrs->out_octets += needed; +@@ -492,6 +494,14 @@ static bool ensure_scratch_size(scratch_memory_t *b, size_t required) + } + + ++// return true if octet in str ++static inline bool filter_str(const char *str, uint8_t octet) ++{ ++ const char *ptr = strchr(str, (int)((unsigned int)octet)); ++ return ptr && *ptr != 0; ++} ++ ++ + // trims any optional whitespace characters at the start of 'line' + // RFC7230 defines OWS as zero or more spaces or horizontal tabs + // +@@ -538,7 +548,7 @@ static bool parse_token(qd_iterator_pointer_t *line, qd_iterator_pointer_t *toke + && (('A' <= octet && octet <= 'Z') || + ('a' <= octet && octet <= 'z') || + ('0' <= octet && octet <= '9') || +- (strchr(TOKEN_EXTRA, octet)))) { ++ (filter_str(TOKEN_EXTRA, octet)))) { + len++; + } + +@@ -1733,17 +1743,17 @@ const char *h1_codec_token_list_next(const char *start, size_t *len, const char + + if (!start) return 0; + +- while (*start && strchr(SKIPME, *start)) ++ while (*start && filter_str(SKIPME, *start)) + ++start; + + if (!*start) return 0; + + const char *end = start; +- while (*end && !strchr(SKIPME, *end)) ++ while (*end && !filter_str(SKIPME, *end)) + ++end; + + *len = end - start; +- while (*end && strchr(SKIPME, *end)) ++ while (*end && filter_str(SKIPME, *end)) + ++end; + + *next = end; +diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c +index 61a0634d..3f46e448 100644 +--- a/src/adaptors/http1/http1_server.c ++++ b/src/adaptors/http1/http1_server.c +@@ -1247,7 +1247,7 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg + + method_str = (char*) qd_iterator_copy(method_iter); + qd_iterator_free(method_iter); +- if (!method_str) { ++ if (!method_str || *method_str == 0) { + return PN_REJECTED; + } + +@@ -1267,7 +1267,7 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg + + qd_parsed_field_t *ref = qd_parse_value_by_key(app_props, TARGET_HEADER_KEY); + target_str = (char*) qd_iterator_copy(qd_parse_raw(ref)); +- if (!target_str) { ++ if (!target_str || *target_str == 0) { + outcome = PN_REJECTED; + goto exit; + } +-- +2.20.1 + +From 479a73afdd55eb0590370c037e4056514a6db4d6 Mon Sep 17 00:00:00 2001 +From: Chuck Rolke +Date: Mon, 15 Feb 2021 11:19:45 -0500 +Subject: [PATCH 06/16] DISPATCH-1955: TCP adaptor adds byte totals and other + stats to logs + + 1. Add byte total to logs where incremental values are shown. + 2. Add management stat summary when connection is closed/deleted. + +This closes #1026 +--- + src/adaptors/tcp_adaptor.c | 12 ++++++++---- + 1 file changed, 8 insertions(+), 4 deletions(-) + +diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c +index a38167f4..51d9316d 100644 +--- a/src/adaptors/tcp_adaptor.c ++++ b/src/adaptors/tcp_adaptor.c +@@ -567,7 +567,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + int read = handle_incoming(conn); + conn->last_in_time = tcp_adaptor->core->uptime_ticks; + conn->bytes_in += read; +- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes", conn->conn_id, read); ++ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes. Total read %"PRIu64" bytes", conn->conn_id, read, conn->bytes_in); + while (qdr_connection_process(conn->qdr_conn)) {} + break; + } +@@ -583,9 +583,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + } + } + } +- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN Wrote %zu bytes", conn->conn_id, written); + conn->last_out_time = tcp_adaptor->core->uptime_ticks; + conn->bytes_out += written; ++ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN Wrote %zu bytes. Total written %"PRIu64" bytes", conn->conn_id, written, conn->bytes_out); + while (qdr_connection_process(conn->qdr_conn)) {} + break; + } +@@ -1527,8 +1527,12 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo + qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; + if (conn->in_list) { + DEQ_REMOVE(tcp_adaptor->connections, conn); +- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_del_tcp_connection_CT %s (%zu)", +- conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); ++ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, ++ "[C%"PRIu64"] qdr_del_tcp_connection_CT %s deleted. bytes_in=%"PRIu64", bytes_out=%"PRId64", " ++ "opened_time=%"PRId64", last_in_time=%"PRId64", last_out_time=%"PRId64". Connections remaining %zu", ++ conn->conn_id, conn->config.host_port, ++ conn->bytes_in, conn->bytes_out, conn->opened_time, conn->last_in_time, conn->last_out_time, ++ DEQ_SIZE(tcp_adaptor->connections)); + } + free_qdr_tcp_connection(conn); + } +-- +2.20.1 + +From 53a5bbca01678e615b1181c95c293b658a68e9e1 Mon Sep 17 00:00:00 2001 +From: Chuck Rolke +Date: Mon, 15 Feb 2021 11:38:37 -0500 +Subject: [PATCH 07/16] DISPATCH-1964: TCP adaptor connection object should be + a pooled type + +--- + src/adaptors/tcp_adaptor.c | 9 ++++++--- + 1 file changed, 6 insertions(+), 3 deletions(-) + +diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c +index 51d9316d..fce22192 100644 +--- a/src/adaptors/tcp_adaptor.c ++++ b/src/adaptors/tcp_adaptor.c +@@ -22,6 +22,7 @@ + #include + #include + #include ++#include "qpid/dispatch/alloc_pool.h" + #include "qpid/dispatch/ctools.h" + #include "qpid/dispatch/protocol_adaptor.h" + #include "delivery.h" +@@ -79,6 +80,8 @@ struct qdr_tcp_connection_t { + }; + + DEQ_DECLARE(qdr_tcp_connection_t, qdr_tcp_connection_list_t); ++ALLOC_DECLARE(qdr_tcp_connection_t); ++ALLOC_DEFINE(qdr_tcp_connection_t); + + typedef struct qdr_tcp_adaptor_t { + qdr_core_t *core; +@@ -241,7 +244,7 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) + } + sys_mutex_free(tc->activation_lock); + //proactor will free the socket +- free(tc); ++ free_qdr_tcp_connection_t(tc); + } + + static void handle_disconnected(qdr_tcp_connection_t* conn) +@@ -597,7 +600,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + + static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener) + { +- qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); ++ qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t(); + ZERO(tc); + tc->activation_lock = sys_mutex(); + tc->ingress = true; +@@ -682,7 +685,7 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) + + static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery) + { +- qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); ++ qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t(); + ZERO(tc); + tc->activation_lock = sys_mutex(); + if (initial_delivery) { +-- +2.20.1 + +From 31c23ca83df6aea7feb724811af09aa6a197dd37 Mon Sep 17 00:00:00 2001 +From: Kenneth Giusti +Date: Wed, 10 Feb 2021 17:23:34 -0500 +Subject: [PATCH 08/16] DISPATCH-1960: Refactor Q2 flow control for protocol + adaptor use. + +This closes #1027 +--- + include/qpid/dispatch/alloc_pool.h | 25 +++++++- + include/qpid/dispatch/container.h | 2 +- + include/qpid/dispatch/message.h | 27 +++++--- + src/adaptors/http1/http1_client.c | 3 +- + src/adaptors/http1/http1_server.c | 3 +- + src/adaptors/http2/http2_adaptor.c | 12 ++-- + src/adaptors/reference_adaptor.c | 6 +- + src/adaptors/tcp_adaptor.c | 3 +- + src/alloc_pool.c | 4 ++ + src/message.c | 100 ++++++++++++++++++++++------- + src/message_private.h | 10 ++- + src/router_node.c | 24 +++---- + tests/message_test.c | 25 ++++++-- + 13 files changed, 184 insertions(+), 60 deletions(-) + +diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h +index e8722a52..d872ff49 100644 +--- a/include/qpid/dispatch/alloc_pool.h ++++ b/include/qpid/dispatch/alloc_pool.h +@@ -80,7 +80,26 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool); + /** De-allocate from a thread pool. Use via ALLOC_DECLARE */ + void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p); + uint32_t qd_alloc_sequence(void *p); +-static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; } ++ ++// generic safe pointer api for any alloc pool item ++ ++#define QD_SAFE_PTR_INIT(p) { .ptr = (void*)(p), .seq = qd_alloc_sequence(p) } ++ ++static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) ++{ ++ sp->ptr = 0; ++} ++ ++static inline void qd_alloc_set_safe_ptr(qd_alloc_safe_ptr_t *sp, void *p) ++{ ++ sp->ptr = p; ++ sp->seq = qd_alloc_sequence(p); ++} ++ ++static inline void *qd_alloc_deref_safe_ptr(const qd_alloc_safe_ptr_t *sp) ++{ ++ return sp->seq == qd_alloc_sequence(sp->ptr) ? sp->ptr : (void*) 0; ++} + + /** + * Declare functions new_T and alloc_T +@@ -102,8 +121,8 @@ static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; } + __thread qd_alloc_pool_t *__local_pool_##T = 0; \ + T *new_##T(void) { return (T*) qd_alloc(&__desc_##T, &__local_pool_##T); } \ + void free_##T(T *p) { qd_dealloc(&__desc_##T, &__local_pool_##T, (char*) p); } \ +- void set_safe_ptr_##T(T *p, T##_sp *sp) { sp->ptr = (void*) p; sp->seq = qd_alloc_sequence((void*) p); } \ +- T *safe_deref_##T(T##_sp sp) { return sp.seq == qd_alloc_sequence((void*) sp.ptr) ? (T*) sp.ptr : (T*) 0; } \ ++ void set_safe_ptr_##T(T *p, T##_sp *sp) { qd_alloc_set_safe_ptr(sp, (void*)p); } \ ++ T *safe_deref_##T(T##_sp sp) { return (T*) qd_alloc_deref_safe_ptr((qd_alloc_safe_ptr_t*) &(sp)); } \ + qd_alloc_stats_t *alloc_stats_##T(void) { return __desc_##T.stats; } \ + void *unused##T + +diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h +index 08caba13..82d9ad8b 100644 +--- a/include/qpid/dispatch/container.h ++++ b/include/qpid/dispatch/container.h +@@ -228,7 +228,7 @@ void qd_link_close(qd_link_t *link); + void qd_link_detach(qd_link_t *link); + void qd_link_free(qd_link_t *link); + void *qd_link_get_node_context(const qd_link_t *link); +-void qd_link_restart_rx(qd_link_t *link); ++void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context); + void qd_link_q3_block(qd_link_t *link); + void qd_link_q3_unblock(qd_link_t *link); + uint64_t qd_link_link_id(const qd_link_t *link); +diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h +index 657477d0..07e144ad 100644 +--- a/include/qpid/dispatch/message.h ++++ b/include/qpid/dispatch/message.h +@@ -253,10 +253,9 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery); + * @param msg A pointer to a message to be sent. + * @param link The outgoing link on which to send the message. + * @param strip_outbound_annotations [in] annotation control flag +- * @param restart_rx [out] indication to wake up receive process + * @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full + */ +-void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx, bool *q3_stalled); ++void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *q3_stalled); + + /** + * Check that the message is well-formed up to a certain depth. Any part of the message that is +@@ -304,9 +303,10 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_c + * + * @param msg Pointer to a message + * @param field A composed field to be appended to the end of the message's stream ++ * @param q2_blocked Set to true if this call caused Q2 to block + * @return The number of buffers stored in the message's content + */ +-int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field); ++int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked); + + + /** +@@ -404,9 +404,10 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, q + * + * @param msg Pointer to message under construction + * @param data List of buffers containing body data. ++ * @param qd_blocked Set to true if this call caused Q2 to block + * @return The number of buffers stored in the message's content + */ +-int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data); ++int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked); + + + /** Put string representation of a message suitable for logging in buffer. +@@ -556,12 +557,22 @@ bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg); + */ + bool qd_message_is_Q2_blocked(const qd_message_t *msg); + ++ + /** +- * Return qd_link through which the message is being received. +- * @param msg A pointer to the message +- * @return the qd_link ++ * Register a callback that will be invoked when the message has exited the Q2 ++ * blocking state. Note that the callback can be invoked on any I/O thread. ++ * The callback must be thread safe. ++ * ++ * @param msg The message to monitor. ++ * @param callback The method to invoke ++ * @param context safe pointer holding the context + */ +-qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg); ++ ++typedef void (*qd_message_q2_unblocked_handler_t)(qd_alloc_safe_ptr_t context); ++void qd_message_set_q2_unblocked_handler(qd_message_t *msg, ++ qd_message_q2_unblocked_handler_t callback, ++ qd_alloc_safe_ptr_t context); ++void qd_message_clear_q2_unblocked_handler(qd_message_t *msg); + + /** + * Return message aborted state +diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c +index 1d17724d..27b25cde 100644 +--- a/src/adaptors/http1/http1_client.c ++++ b/src/adaptors/http1/http1_client.c +@@ -860,7 +860,8 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b + "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.", + hconn->conn_id, hconn->in_link_id, len); + +- qd_message_stream_data_append(msg, body); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_stream_data_append(msg, body, 0); + + // + // Notify the router that more data is ready to be pushed out on the delivery +diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c +index 3f46e448..1ef6b2ab 100644 +--- a/src/adaptors/http1/http1_server.c ++++ b/src/adaptors/http1/http1_server.c +@@ -976,7 +976,8 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b + + qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv); + +- qd_message_stream_data_append(msg, body); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_stream_data_append(msg, body, 0); + + // + // Notify the router that more data is ready to be pushed out on the delivery +diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c +index 2330ece8..25347e40 100644 +--- a/src/adaptors/http2/http2_adaptor.c ++++ b/src/adaptors/http2/http2_adaptor.c +@@ -465,7 +465,8 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, + qd_buffer_list_t existing_buffers; + DEQ_INIT(existing_buffers); + qd_compose_take_buffers(stream_data->body, &existing_buffers); +- qd_message_stream_data_append(stream_data->message, &existing_buffers); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_stream_data_append(stream_data->message, &existing_buffers, 0); + stream_data->body_data_added = true; + } + } +@@ -475,7 +476,8 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, + stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + stream_data->body_data_added = true; + } +- qd_message_stream_data_append(stream_data->message, &buffers); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_stream_data_append(stream_data->message, &buffers, 0); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, stream_id); + } + else { +@@ -930,7 +932,8 @@ static int on_frame_recv_callback(nghttp2_session *session, + if (!stream_data->body) { + stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary(stream_data->body, 0, 0); +- qd_message_extend(stream_data->message, stream_data->body); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_extend(stream_data->message, stream_data->body, 0); + } + } + +@@ -965,7 +968,8 @@ static int on_frame_recv_callback(nghttp2_session *session, + if (stream_data->use_footer_properties) { + qd_compose_end_map(stream_data->footer_properties); + stream_data->entire_footer_arrived = true; +- qd_message_extend(stream_data->message, stream_data->footer_properties); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_extend(stream_data->message, stream_data->footer_properties, 0); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Closing footer map, extending message with footer", conn->conn_id, stream_id); + } + else { +diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c +index 2fd3ff2e..9982ad04 100644 +--- a/src/adaptors/reference_adaptor.c ++++ b/src/adaptors/reference_adaptor.c +@@ -497,7 +497,8 @@ static void on_stream(void *context) + // + // Extend the streaming message and free the composed field + // +- depth = qd_message_extend(adaptor->streaming_message, field); ++ // TODO(kgiusti): need to handle Q2 blocking event ++ depth = qd_message_extend(adaptor->streaming_message, field, 0); + qd_compose_free(field); + } + +@@ -519,7 +520,8 @@ static void on_stream(void *context) + qd_compose_insert_symbol(footer, "second"); + qd_compose_insert_string(footer, "value2"); + qd_compose_end_map(footer); +- depth = qd_message_extend(adaptor->streaming_message, footer); ++ // @TODO(kgiusti): need to handle Q2 blocking event ++ depth = qd_message_extend(adaptor->streaming_message, footer, 0); + qd_compose_free(footer); + + qd_message_set_receive_complete(adaptor->streaming_message); +diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c +index fce22192..6cc8855f 100644 +--- a/src/adaptors/tcp_adaptor.c ++++ b/src/adaptors/tcp_adaptor.c +@@ -183,7 +183,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn) + grant_read_buffers(conn); + + if (conn->instream) { +- qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers); ++ // @TODO(kgiusti): handle Q2 block event: ++ qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, 0); + qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); + } else { +diff --git a/src/alloc_pool.c b/src/alloc_pool.c +index 284630ec..2ab65675 100644 +--- a/src/alloc_pool.c ++++ b/src/alloc_pool.c +@@ -507,6 +507,10 @@ uint32_t qd_alloc_sequence(void *p) + return 0; + + qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1; ++#ifdef QD_MEMORY_DEBUG ++ // ensure p actually points to an alloc pool item ++ assert(item->header == PATTERN_FRONT); ++#endif + return item->sequence; + } + +diff --git a/src/message.c b/src/message.c +index 3e86f544..f81d15e9 100644 +--- a/src/message.c ++++ b/src/message.c +@@ -1020,7 +1020,8 @@ void qd_message_free(qd_message_t *in_msg) + { + if (!in_msg) return; + uint32_t rc; +- qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; ++ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; ++ qd_message_q2_unblocker_t q2_unblock = {0}; + + qd_buffer_list_free_buffers(&msg->ma_to_override); + qd_buffer_list_free_buffers(&msg->ma_trace); +@@ -1055,12 +1056,16 @@ void qd_message_free(qd_message_t *in_msg) + && was_blocked + && qd_message_Q2_holdoff_should_unblock(in_msg)) { + content->q2_input_holdoff = false; +- qd_link_restart_rx(qd_message_get_receiving_link(in_msg)); ++ q2_unblock = content->q2_unblocker; + } + + UNLOCK(content->lock); + } + ++ // the Q2 handler must be invoked outside the lock ++ if (q2_unblock.handler) ++ q2_unblock.handler(q2_unblock.context); ++ + rc = sys_atomic_dec(&content->ref_count) - 1; + if (rc == 0) { + if (content->ma_field_iter_in) +@@ -1320,7 +1325,14 @@ void qd_message_set_receive_complete(qd_message_t *in_msg) + { + if (!!in_msg) { + qd_message_content_t *content = MSG_CONTENT(in_msg); ++ ++ LOCK(content->lock); ++ + content->receive_complete = true; ++ content->q2_unblocker.handler = 0; ++ qd_nullify_safe_ptr(&content->q2_unblocker.context); ++ ++ UNLOCK(content->lock); + } + } + +@@ -1384,7 +1396,6 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, + } else if (rc == PN_EOS || rc < 0) { + // End of message or error: finalize message_receive handling + msg->content->aborted = pn_delivery_aborted(delivery); +- qd_nullify_safe_ptr(&msg->content->input_link_sp); + pn_record_t *record = pn_delivery_attachments(delivery); + pn_record_set(record, PN_DELIVERY_CTX, 0); + if (msg->content->oversize) { +@@ -1392,7 +1403,7 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, + // This has no effect on the received message. + msg->content->aborted = true; + } +- msg->content->receive_complete = true; ++ qd_message_set_receive_complete((qd_message_t*) msg); + break; + } else { + // rc was > 0. bytes were read and discarded. +@@ -1429,7 +1440,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) + if (!msg) { + msg = (qd_message_pvt_t*) qd_message(); + qd_connection_t *qdc = qd_link_connection(qdl); +- set_safe_ptr_qd_link_t(qdl, &msg->content->input_link_sp); ++ qd_alloc_safe_ptr_t sp = QD_SAFE_PTR_INIT(qdl); ++ qd_message_set_q2_unblocked_handler((qd_message_t*) msg, qd_link_q2_restart_receive, sp); + msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc); + pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF); + pn_record_set(record, PN_DELIVERY_CTX, (void*) msg); +@@ -1491,8 +1503,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) + } + + content->receive_complete = true; ++ content->q2_unblocker.handler = 0; ++ qd_nullify_safe_ptr(&content->q2_unblocker.context); + content->aborted = pn_delivery_aborted(delivery); +- qd_nullify_safe_ptr(&content->input_link_sp); + + // unlink message and delivery + pn_record_set(record, PN_DELIVERY_CTX, 0); +@@ -1726,7 +1739,6 @@ static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t + void qd_message_send(qd_message_t *in_msg, + qd_link_t *link, + bool strip_annotations, +- bool *restart_rx, + bool *q3_stalled) + { + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; +@@ -1734,7 +1746,6 @@ void qd_message_send(qd_message_t *in_msg, + qd_buffer_t *buf = 0; + pn_link_t *pnl = qd_link_pn(link); + +- *restart_rx = false; + *q3_stalled = false; + + if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { +@@ -1842,8 +1853,9 @@ void qd_message_send(qd_message_t *in_msg, + + buf = msg->cursor.buffer; + +- pn_session_t *pns = pn_link_session(pnl); +- const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; ++ qd_message_q2_unblocker_t q2_unblock = {0}; ++ pn_session_t *pns = pn_link_session(pnl); ++ const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; + + while (!content->aborted + && buf +@@ -1913,7 +1925,7 @@ void qd_message_send(qd_message_t *in_msg, + // set input holdoff before the deferred handler + // runs. + content->q2_input_holdoff = false; +- *restart_rx = true; ++ q2_unblock = content->q2_unblocker; + } + } + } // end free buffer +@@ -1940,6 +1952,10 @@ void qd_message_send(qd_message_t *in_msg, + UNLOCK(content->lock); + } + ++ // the Q2 handler must be invoked outside the lock ++ if (q2_unblock.handler) ++ q2_unblock.handler(q2_unblock.context); ++ + if (content->aborted) { + if (pn_link_current(pnl)) { + msg->send_complete = true; +@@ -2319,13 +2335,16 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com + } + + +-int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field) ++int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked) + { + qd_message_content_t *content = MSG_CONTENT(msg); + int count; + qd_buffer_list_t *buffers = qd_compose_buffers(field); + qd_buffer_t *buf = DEQ_HEAD(*buffers); + ++ if (q2_blocked) ++ *q2_blocked = false; ++ + LOCK(content->lock); + while (buf) { + qd_buffer_set_fanout(buf, content->fanout); +@@ -2334,6 +2353,14 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field) + + DEQ_APPEND(content->buffers, (*buffers)); + count = DEQ_SIZE(content->buffers); ++ ++ // buffers added - much check for Q2: ++ if (qd_message_Q2_holdoff_should_block(msg)) { ++ content->q2_input_holdoff = true; ++ if (q2_blocked) ++ *q2_blocked = true; ++ } ++ + UNLOCK(content->lock); + return count; + } +@@ -2549,7 +2576,8 @@ void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) + + LOCK(content->lock); + +- bool was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt); ++ bool was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt); ++ qd_message_q2_unblocker_t q2_unblock = {0}; + + if (pvt->is_fanout) { + buf = start_buf; +@@ -2581,13 +2609,16 @@ void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) + && was_blocked + && qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt)) { + content->q2_input_holdoff = false; +- qd_link_restart_rx(qd_message_get_receiving_link((qd_message_t*) pvt)); ++ q2_unblock = content->q2_unblocker; + } + + UNLOCK(content->lock); + + DEQ_REMOVE(pvt->stream_data_list, stream_data); + free_qd_message_stream_data_t(stream_data); ++ ++ if (q2_unblock.handler) ++ q2_unblock.handler(q2_unblock.context); + } + + +@@ -2820,12 +2851,6 @@ bool qd_message_is_Q2_blocked(const qd_message_t *msg) + } + + +-qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg) +-{ +- return safe_deref_qd_link_t(((qd_message_pvt_t *)msg)->content->input_link_sp); +-} +- +- + bool qd_message_aborted(const qd_message_t *msg) + { + return ((qd_message_pvt_t *)msg)->content->aborted; +@@ -2847,12 +2872,15 @@ bool qd_message_oversize(const qd_message_t *msg) + } + + +-int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data) ++int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked) + { + unsigned int length = DEQ_SIZE(*data); + qd_composed_field_t *field = 0; + int rc = 0; + ++ if (q2_blocked) ++ *q2_blocked = false; ++ + if (length == 0) + return rc; + +@@ -2887,7 +2915,35 @@ int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data) + field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); + qd_compose_insert_binary_buffers(field, data); + +- rc = qd_message_extend(message, field); ++ rc = qd_message_extend(message, field, q2_blocked); + qd_compose_free(field); + return rc; + } ++ ++ ++void qd_message_set_q2_unblocked_handler(qd_message_t *msg, ++ qd_message_q2_unblocked_handler_t callback, ++ qd_alloc_safe_ptr_t context) ++{ ++ qd_message_content_t *content = MSG_CONTENT(msg); ++ ++ LOCK(content->lock); ++ ++ content->q2_unblocker.handler = callback; ++ content->q2_unblocker.context = context; ++ ++ UNLOCK(content->lock); ++} ++ ++ ++void qd_message_clear_Q2_unblocked_handler(qd_message_t *msg) ++{ ++ qd_message_content_t *content = MSG_CONTENT(msg); ++ ++ LOCK(content->lock); ++ ++ content->q2_unblocker.handler = 0; ++ qd_nullify_safe_ptr(&content->q2_unblocker.context); ++ ++ UNLOCK(content->lock); ++} +diff --git a/src/message_private.h b/src/message_private.h +index a8067c77..c4262a98 100644 +--- a/src/message_private.h ++++ b/src/message_private.h +@@ -73,6 +73,13 @@ struct qd_message_stream_data_t { + ALLOC_DECLARE(qd_message_stream_data_t); + DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t); + ++ ++typedef struct { ++ qd_message_q2_unblocked_handler_t handler; ++ qd_alloc_safe_ptr_t context; ++} qd_message_q2_unblocker_t; ++ ++ + // TODO - consider using pointers to qd_field_location_t below to save memory + // TODO - provide a way to allocate a message without a lock for the link-routing case. + // It's likely that link-routing will cause no contention for the message content. +@@ -126,7 +133,8 @@ typedef struct { + uint64_t max_message_size; // configured max; 0 if no max to enforce + uint64_t bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size + uint32_t fanout; // The number of receivers for this message, including in-process subscribers. +- qd_link_t_sp input_link_sp; // message received on this link ++ ++ qd_message_q2_unblocker_t q2_unblocker; // callback and context to signal Q2 unblocked to receiver + + bool ma_parsed; // have parsed annotations in incoming message + bool discard; // Should this message be discarded? +diff --git a/src/router_node.c b/src/router_node.c +index 7dbdb0d6..70d98f91 100644 +--- a/src/router_node.c ++++ b/src/router_node.c +@@ -855,6 +855,7 @@ static void deferred_AMQP_rx_handler(void *context, bool discard) + if (!discard) { + qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl); + if (!!qdl) { ++ assert(qd_link_direction(qdl) == QD_INCOMING); + qd_router_t *qdr = (qd_router_t*) qd_link_get_node_context(qdl); + assert(qdr != 0); + while (true) { +@@ -1914,22 +1915,17 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_ + if (!pdlv) + return 0; + +- bool restart_rx = false; + bool q3_stalled = false; + + qd_message_t *msg_out = qdr_delivery_message(dlv); + +- qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx, &q3_stalled); ++ qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &q3_stalled); + + if (q3_stalled) { + qd_link_q3_block(qlink); + qdr_link_stalled_outbound(link); + } + +- if (restart_rx) { +- qd_link_restart_rx(qd_message_get_receiving_link(msg_out)); +- } +- + bool send_complete = qdr_delivery_send_complete(dlv); + + if (send_complete) { +@@ -2059,7 +2055,10 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di + // and if it is blocked by Q2 holdoff, get the link rolling again. + // + qd_message_Q2_holdoff_disable(msg); +- qd_link_restart_rx(link); ++ ++ qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp); ++ set_safe_ptr_qd_link_t(link, safe_ptr); ++ qd_connection_invoke_deferred(qd_conn, deferred_AMQP_rx_handler, safe_ptr); + } + } + } +@@ -2131,10 +2130,13 @@ qdr_core_t *qd_router_core(qd_dispatch_t *qd) + } + + +-// called when Q2 holdoff is deactivated so we can receive more message buffers ++// invoked by an I/O thread when enough buffers have been released deactivate ++// the Q2 block. Note that this method will likely be running on a worker ++// thread that is not the same thread that "owns" the qd_link_t passed in. + // +-void qd_link_restart_rx(qd_link_t *in_link) ++void qd_link_q2_restart_receive(qd_alloc_safe_ptr_t context) + { ++ qd_link_t *in_link = (qd_link_t*) qd_alloc_deref_safe_ptr(&context); + if (!in_link) + return; + +@@ -2142,8 +2144,8 @@ void qd_link_restart_rx(qd_link_t *in_link) + + qd_connection_t *in_conn = qd_link_connection(in_link); + if (in_conn) { +- qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp); +- set_safe_ptr_qd_link_t(in_link, safe_ptr); ++ qd_link_t_sp *safe_ptr = NEW(qd_alloc_safe_ptr_t); ++ *safe_ptr = context; // use original to keep old sequence counter + qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr); + } + } +diff --git a/tests/message_test.c b/tests/message_test.c +index 7da011c5..18a22b09 100644 +--- a/tests/message_test.c ++++ b/tests/message_test.c +@@ -738,7 +738,7 @@ static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size, + // the buffers in 'field' are inserted into message 'msg'. + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary_buffers(field, &mule_content->buffers); +- qd_message_extend(msg, field); ++ qd_message_extend(msg, field, 0); + + // Clean up temporary resources + free(buf2); +@@ -979,13 +979,20 @@ static char *test_check_stream_data_append(void * context) + // snapshot the message buffer count to use as a baseline + const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(msg)->buffers); + +- int depth = qd_message_stream_data_append(msg, &bin_data); ++ bool blocked; ++ int depth = qd_message_stream_data_append(msg, &bin_data, &blocked); + if (depth <= body_bufct) { + // expected to add extra buffer(s) for meta-data + result = "append length is incorrect"; + goto exit; + } + ++ // expected that the append has triggered Q2 blocking: ++ if (!blocked) { ++ result = "expected Q2 block event did not occur!"; ++ goto exit; ++ } ++ + // And while we're at it, stuff in a footer + field = qd_compose(QD_PERFORMATIVE_FOOTER, 0); + qd_compose_start_map(field); +@@ -994,7 +1001,7 @@ static char *test_check_stream_data_append(void * context) + qd_compose_insert_symbol(field, "Key2"); + qd_compose_insert_string(field, "Value2"); + qd_compose_end_map(field); +- qd_message_extend(msg, field); ++ qd_message_extend(msg, field, 0); + qd_compose_free(field); + + qd_message_set_receive_complete(msg); +@@ -1123,7 +1130,7 @@ static char *test_check_stream_data_fanout(void *context) + memset(buffer, '5', 1001); + qd_compose_insert_binary(field, buffer, 5); + +- qd_message_extend(in_msg, field); ++ qd_message_extend(in_msg, field, 0); + qd_compose_free(field); + + qd_message_set_receive_complete(in_msg); +@@ -1232,6 +1239,7 @@ static char *test_check_stream_data_footer(void *context) + const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers); + + // Append a footer ++ bool q2_blocked; + field = qd_compose(QD_PERFORMATIVE_FOOTER, 0); + qd_compose_start_map(field); + qd_compose_insert_symbol(field, "Key1"); +@@ -1239,9 +1247,16 @@ static char *test_check_stream_data_footer(void *context) + qd_compose_insert_symbol(field, "Key2"); + qd_compose_insert_string(field, "Value2"); + qd_compose_end_map(field); +- qd_message_extend(in_msg, field); ++ qd_message_extend(in_msg, field, &q2_blocked); + qd_compose_free(field); + ++ // this small message should not have triggered Q2 ++ assert(DEQ_SIZE(MSG_CONTENT(in_msg)->buffers) < QD_QLIMIT_Q2_UPPER); ++ if (q2_blocked) { ++ result = "Unexpected Q2 block on message extend"; ++ goto exit; ++ } ++ + qd_message_set_receive_complete(in_msg); + + // "fan out" the message +-- +2.20.1 + +From 48b0157f3de58eb2d9c69e448e2c4affb0a1c04a Mon Sep 17 00:00:00 2001 +From: Kenneth Giusti +Date: Thu, 11 Feb 2021 16:24:28 -0500 +Subject: [PATCH 09/16] DISPATCH-1961: Enable Q2 flow control for HTTP/1.x + adaptor + +This closes #1029 +--- + src/adaptors/http1/http1_adaptor.c | 21 +++ + src/adaptors/http1/http1_client.c | 86 ++++++++-- + src/adaptors/http1/http1_private.h | 4 +- + src/adaptors/http1/http1_server.c | 107 +++++++++--- + src/message.c | 14 +- + tests/system_test.py | 4 + + tests/system_tests_http1_adaptor.py | 253 +++++++++++++++++++++++++++- + 7 files changed, 437 insertions(+), 52 deletions(-) + +diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c +index 6ce1eadf..9e4eeb6f 100644 +--- a/src/adaptors/http1/http1_adaptor.c ++++ b/src/adaptors/http1/http1_adaptor.c +@@ -119,6 +119,8 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn) + pn_raw_connection_close(rconn); + } + ++ sys_atomic_destroy(&hconn->q2_restart); ++ + free(hconn->cfg.host); + free(hconn->cfg.port); + free(hconn->cfg.address); +@@ -415,6 +417,25 @@ void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn) + } + + ++// Per-message callback to resume receiving after Q2 is unblocked on the ++// incoming link (to HTTP app). This routine runs on another I/O thread so it ++// must be thread safe! ++// ++void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) ++{ ++ // prevent the hconn from being deleted while running: ++ sys_mutex_lock(qdr_http1_adaptor->lock); ++ ++ qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)qd_alloc_deref_safe_ptr(&context); ++ if (hconn && hconn->raw_conn) { ++ sys_atomic_set(&hconn->q2_restart, 1); ++ pn_raw_connection_wake(hconn->raw_conn); ++ } ++ ++ sys_mutex_unlock(qdr_http1_adaptor->lock); ++} ++ ++ + // + // Protocol Adaptor Callbacks + // +diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c +index 27b25cde..ff033807 100644 +--- a/src/adaptors/http1/http1_client.c ++++ b/src/adaptors/http1/http1_client.c +@@ -133,6 +133,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) + hconn->adaptor = qdr_http1_adaptor; + hconn->handler_context.handler = &_handle_connection_events; + hconn->handler_context.context = hconn; ++ sys_atomic_init(&hconn->q2_restart, 0); + + hconn->client.next_msg_id = 1; + +@@ -364,6 +365,36 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn) + } + + ++// handle PN_RAW_CONNECTION_READ ++static int _handle_conn_read_event(qdr_http1_connection_t *hconn) ++{ ++ int error = 0; ++ qd_buffer_list_t blist; ++ uintmax_t length; ++ qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length); ++ if (length) { ++ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, ++ "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client (%zu buffers)", ++ hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist)); ++ hconn->in_http1_octets += length; ++ error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); ++ } ++ return error; ++} ++ ++ ++// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS ++static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn) ++{ ++ // @TODO(kgiusti): backpressure if no credit ++ if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) { ++ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); ++ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", ++ hconn->conn_id, granted); ++ } ++} ++ ++ + // Proton Connection Event Handler + // + static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context) +@@ -424,31 +455,34 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi + } + case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id); +- // @TODO(kgiusti): backpressure if no credit +- if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) { +- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); +- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", +- hconn->conn_id, granted); +- } ++ _handle_conn_need_read_buffers(hconn); + break; + } + case PN_RAW_CONNECTION_WAKE: { ++ int error = 0; + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id); ++ ++ if (sys_atomic_set(&hconn->q2_restart, 0)) { ++ // note: unit tests grep for this log! ++ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", hconn->conn_id); ++ hconn->q2_blocked = false; ++ error = _handle_conn_read_event(hconn); // restart receiving ++ _handle_conn_need_read_buffers(hconn); ++ } ++ + while (qdr_connection_process(hconn->qdr_conn)) {} ++ ++ if (error) ++ qdr_http1_close_connection(hconn, "Incoming request message failed to parse"); ++ + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id); + break; + } + case PN_RAW_CONNECTION_READ: { +- qd_buffer_list_t blist; +- uintmax_t length; +- qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length); +- if (length) { +- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client", +- hconn->conn_id, hconn->in_link_id, length); +- hconn->in_http1_octets += length; +- int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); ++ if (!hconn->q2_blocked) { ++ int error = _handle_conn_read_event(hconn); + if (error) +- qdr_http1_close_connection(hconn, "Incoming request message failed to parse"); ++ qdr_http1_close_connection(hconn, "Incoming response message failed to parse"); + } + break; + } +@@ -581,7 +615,7 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ + } + + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, +- "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded", ++ "[C%"PRIu64"][L%"PRIu64"] %u response octets encoded", + hconn->conn_id, hconn->out_link_id, len); + + +@@ -825,6 +859,13 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo + qd_compose_free(hreq->request_props); + hreq->request_props = 0; + ++ // future-proof: ensure the message headers have not caused Q2 ++ // blocking. We only check for Q2 events while adding body data. ++ assert(!qd_message_is_Q2_blocked(hreq->request_msg)); ++ ++ qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn); ++ qd_message_set_q2_unblocked_handler(hreq->request_msg, qdr_http1_q2_unblocked_handler, hconn_sp); ++ + // Use up one credit to obtain a delivery and forward to core. If no + // credit is available the request is stalled until the core grants more + // flow. +@@ -848,6 +889,7 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo + static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len, + bool more) + { ++ bool q2_blocked = false; + _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs); + qdr_http1_connection_t *hconn = hreq->base.hconn; + if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) { +@@ -860,8 +902,12 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b + "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.", + hconn->conn_id, hconn->in_link_id, len); + +- // @TODO(kgiusti): handle Q2 block event: +- qd_message_stream_data_append(msg, body, 0); ++ qd_message_stream_data_append(msg, body, &q2_blocked); ++ hconn->q2_blocked = hconn->q2_blocked || q2_blocked; ++ if (q2_blocked) { ++ // note: unit tests grep for this log! ++ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", hconn->conn_id); ++ } + + // + // Notify the router that more data is ready to be pushed out on the delivery +@@ -1635,6 +1681,10 @@ static void _write_pending_response(_client_request_t *hreq) + static void _client_request_free(_client_request_t *hreq) + { + if (hreq) { ++ // deactivate the Q2 callback ++ qd_message_t *msg = hreq->request_dlv ? qdr_delivery_message(hreq->request_dlv) : hreq->request_msg; ++ qd_message_clear_q2_unblocked_handler(msg); ++ + qdr_http1_request_base_cleanup(&hreq->base); + qd_message_free(hreq->request_msg); + if (hreq->request_dlv) { +diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h +index 858efb5d..5ecf4fdf 100644 +--- a/src/adaptors/http1/http1_private.h ++++ b/src/adaptors/http1/http1_private.h +@@ -172,6 +172,8 @@ struct qdr_http1_connection_t { + qdr_link_t *in_link; + uint64_t in_link_id; + int in_link_credit; // provided by router ++ sys_atomic_t q2_restart; // signal to resume receive ++ bool q2_blocked; // stop reading from raw conn + + // Oldest at HEAD + // +@@ -220,7 +222,7 @@ void qdr_http1_error_response(qdr_http1_request_base_t *hreq, + const char *reason); + void qdr_http1_rejected_response(qdr_http1_request_base_t *hreq, + const qdr_error_t *error); +- ++void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context); + + // http1_client.c protocol adaptor callbacks + // +diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c +index 1ef6b2ab..c1fb17b0 100644 +--- a/src/adaptors/http1/http1_server.c ++++ b/src/adaptors/http1/http1_server.c +@@ -152,6 +152,7 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct + hconn->adaptor = qdr_http1_adaptor; + hconn->handler_context.handler = &_handle_connection_events; + hconn->handler_context.context = hconn; ++ sys_atomic_init(&hconn->q2_restart, 0); + hconn->cfg.host = qd_strdup(bconfig->host); + hconn->cfg.port = qd_strdup(bconfig->port); + hconn->cfg.address = qd_strdup(bconfig->address); +@@ -466,6 +467,48 @@ static void _accept_and_settle_request(_server_request_t *hreq) + hreq->request_settled = true; + } + ++ ++// handle PN_RAW_CONNECTION_READ ++static int _handle_conn_read_event(qdr_http1_connection_t *hconn) ++{ ++ int error = 0; ++ qd_buffer_list_t blist; ++ uintmax_t length; ++ ++ qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length); ++ ++ if (HTTP1_DUMP_BUFFERS) { ++ fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length); ++ qd_buffer_t *bb = DEQ_HEAD(blist); ++ while (bb) { ++ fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]); ++ bb = DEQ_NEXT(bb); ++ } ++ fflush(stdout); ++ } ++ ++ if (length) { ++ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, ++ "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server (%zu buffers)", ++ hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist)); ++ hconn->in_http1_octets += length; ++ error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); ++ } ++ return error; ++} ++ ++ ++// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS ++static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn) ++{ ++ // @TODO(kgiusti): backpressure if no credit ++ // if (hconn->in_link_credit > 0 */) ++ int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); ++ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", ++ hconn->conn_id, granted); ++} ++ ++ + // Proton Raw Connection Events + // + static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context) +@@ -490,6 +533,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi + // message (response body terminated on connection closed) + h1_codec_connection_rx_closed(hconn->http_conn); + pn_raw_connection_close(hconn->raw_conn); ++ hconn->q2_blocked = false; + break; + } + +@@ -557,39 +601,32 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi + break; + } + case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { +- // @TODO(kgiusti): backpressure if no credit +- // if (hconn->in_link_credit > 0 */) +- int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); +- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", +- hconn->conn_id, granted); ++ _handle_conn_need_read_buffers(hconn); + break; + } + case PN_RAW_CONNECTION_WAKE: { ++ int error = 0; + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id); ++ ++ if (sys_atomic_set(&hconn->q2_restart, 0)) { ++ // note: unit tests grep for this log! ++ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from Q2 limit", hconn->conn_id); ++ hconn->q2_blocked = false; ++ error = _handle_conn_read_event(hconn); // restart receiving ++ _handle_conn_need_read_buffers(hconn); ++ } ++ + while (qdr_connection_process(hconn->qdr_conn)) {} ++ ++ if (error) ++ qdr_http1_close_connection(hconn, "Incoming response message failed to parse"); ++ + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Connection processing complete", hconn->conn_id); + break; + } + case PN_RAW_CONNECTION_READ: { +- qd_buffer_list_t blist; +- uintmax_t length; +- qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length); +- +- if (HTTP1_DUMP_BUFFERS) { +- fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length); +- qd_buffer_t *bb = DEQ_HEAD(blist); +- while (bb) { +- fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]); +- bb = DEQ_NEXT(bb); +- } +- fflush(stdout); +- } +- +- if (length) { +- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server", +- hconn->conn_id, hconn->in_link_id, length); +- hconn->in_http1_octets += length; +- int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); ++ if (!hconn->q2_blocked) { ++ int error = _handle_conn_read_event(hconn); + if (error) + qdr_http1_close_connection(hconn, "Incoming response message failed to parse"); + } +@@ -934,6 +971,13 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo + qd_compose_free(rmsg->msg_props); + rmsg->msg_props = 0; + ++ // future-proof: ensure the message headers have not caused Q2 ++ // blocking. We only check for Q2 events while adding body data. ++ assert(!qd_message_is_Q2_blocked(rmsg->msg)); ++ ++ qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn); ++ qd_message_set_q2_unblocked_handler(rmsg->msg, qdr_http1_q2_unblocked_handler, hconn_sp); ++ + // start delivery if possible + if (hconn->in_link_credit > 0 && rmsg == DEQ_HEAD(hreq->responses)) { + hconn->in_link_credit -= 1; +@@ -962,6 +1006,7 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b + { + _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs); + qdr_http1_connection_t *hconn = hreq->base.hconn; ++ bool q2_blocked = false; + + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.", +@@ -976,8 +1021,13 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b + + qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv); + +- // @TODO(kgiusti): handle Q2 block event: +- qd_message_stream_data_append(msg, body, 0); ++ ++ qd_message_stream_data_append(msg, body, &q2_blocked); ++ hconn->q2_blocked = hconn->q2_blocked || q2_blocked; ++ if (q2_blocked) { ++ // note: unit tests grep for this log! ++ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] server link blocked on Q2 limit", hconn->conn_id); ++ } + + // + // Notify the router that more data is ready to be pushed out on the delivery +@@ -1518,6 +1568,11 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor, + static void _server_response_msg_free(_server_request_t *hreq, _server_response_msg_t *rmsg) + { + DEQ_REMOVE(hreq->responses, rmsg); ++ ++ // deactivate the Q2 callback ++ qd_message_t *msg = rmsg->dlv ? qdr_delivery_message(rmsg->dlv) : rmsg->msg; ++ qd_message_clear_q2_unblocked_handler(msg); ++ + qd_message_free(rmsg->msg); + qd_compose_free(rmsg->msg_props); + if (rmsg->dlv) { +diff --git a/src/message.c b/src/message.c +index f81d15e9..16794075 100644 +--- a/src/message.c ++++ b/src/message.c +@@ -2936,14 +2936,16 @@ void qd_message_set_q2_unblocked_handler(qd_message_t *msg, + } + + +-void qd_message_clear_Q2_unblocked_handler(qd_message_t *msg) ++void qd_message_clear_q2_unblocked_handler(qd_message_t *msg) + { +- qd_message_content_t *content = MSG_CONTENT(msg); ++ if (msg) { ++ qd_message_content_t *content = MSG_CONTENT(msg); + +- LOCK(content->lock); ++ LOCK(content->lock); + +- content->q2_unblocker.handler = 0; +- qd_nullify_safe_ptr(&content->q2_unblocker.context); ++ content->q2_unblocker.handler = 0; ++ qd_nullify_safe_ptr(&content->q2_unblocker.context); + +- UNLOCK(content->lock); ++ UNLOCK(content->lock); ++ } + } +diff --git a/tests/system_test.py b/tests/system_test.py +index bab62c42..5fec0914 100755 +--- a/tests/system_test.py ++++ b/tests/system_test.py +@@ -733,6 +733,10 @@ class Qdrouterd(Process): + def wait_router_connected(self, router_id, **retry_kwargs): + retry(lambda: self.is_router_connected(router_id), **retry_kwargs) + ++ @property ++ def logfile_path(self): ++ return os.path.join(self.outdir, self.logfile) ++ + + class Tester(object): + """Tools for use by TestCase +diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py +index c182a3dd..9e6776ac 100644 +--- a/tests/system_tests_http1_adaptor.py ++++ b/tests/system_tests_http1_adaptor.py +@@ -27,10 +27,13 @@ from __future__ import absolute_import + from __future__ import print_function + + ++import errno ++import io ++import select + import socket + import sys + from threading import Thread +-from time import sleep ++from time import sleep, time + import uuid + try: + from http.server import HTTPServer, BaseHTTPRequestHandler +@@ -1705,5 +1708,253 @@ class Http1AdaptorBadEndpointsTest(TestCase): + self.assertEqual(1, count) + + ++ ++class Http1AdaptorQ2Standalone(TestCase): ++ """ ++ Force Q2 blocking/recovery on both client and server endpoints. This test ++ uses a single router to ensure both client facing and server facing ++ Q2 components of the HTTP/1.x adaptor are triggered. ++ """ ++ @classmethod ++ def setUpClass(cls): ++ """ ++ Single router configuration with one HTTPListener and one ++ HTTPConnector. ++ """ ++ super(Http1AdaptorQ2Standalone, cls).setUpClass() ++ ++ cls.http_server_port = cls.tester.get_port() ++ cls.http_listener_port = cls.tester.get_port() ++ ++ config = [ ++ ('router', {'mode': 'standalone', ++ 'id': 'RowdyRoddyRouter', ++ 'allowUnsettledMulticast': 'yes'}), ++ ('listener', {'role': 'normal', ++ 'port': cls.tester.get_port()}), ++ ('httpListener', {'port': cls.http_listener_port, ++ 'protocolVersion': 'HTTP1', ++ 'address': 'testServer'}), ++ ('httpConnector', {'port': cls.http_server_port, ++ 'protocolVersion': 'HTTP1', ++ 'address': 'testServer'}), ++ ('address', {'prefix': 'closest', 'distribution': 'closest'}), ++ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), ++ ] ++ config = Qdrouterd.Config(config) ++ cls.INT_A = cls.tester.qdrouterd("TestBadEndpoints", config, wait=True) ++ cls.INT_A.listener = cls.INT_A.addresses[0] ++ ++ ++ def _write_until_full(self, sock, data, timeout): ++ """ ++ Write data to socket until either all data written or timeout. ++ Return the number of bytes written, which will == len(data) if timeout ++ not hit ++ """ ++ sock.setblocking(0) ++ sent = 0 ++ ++ while sent < len(data): ++ try: ++ _, rw, _ = select.select([], [sock], [], timeout) ++ except select.error as serror: ++ if serror[0] == errno.EINTR: ++ print("ignoring interrupt from select(): %s" % str(serror)) ++ continue ++ raise # assuming fatal... ++ if rw: ++ sent += sock.send(data[sent:]) ++ else: ++ break # timeout ++ return sent ++ ++ def _read_until_empty(self, sock, timeout): ++ """ ++ Read data from socket until timeout occurs. Return read data. ++ """ ++ sock.setblocking(0) ++ data = b'' ++ ++ while True: ++ try: ++ rd, _, _ = select.select([sock], [], [], timeout) ++ except select.error as serror: ++ if serror[0] == errno.EINTR: ++ print("ignoring interrupt from select(): %s" % str(serror)) ++ continue ++ raise # assuming fatal... ++ if rd: ++ data += sock.recv(4096) ++ else: ++ break # timeout ++ return data ++ ++ def test_01_backpressure_client(self): ++ """ ++ Trigger Q2 backpressure against the HTTP client. ++ """ ++ ++ # create a listener socket to act as the server service ++ server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ++ server_listener.settimeout(TIMEOUT) ++ server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ++ server_listener.bind(('', self.http_server_port)) ++ server_listener.listen(1) ++ ++ # block until router connects ++ server_sock, host_port = server_listener.accept() ++ server_sock.settimeout(0.5) ++ server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) ++ ++ # create a client connection to the router ++ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ++ client_sock.settimeout(0.5) ++ client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) ++ client_sock.connect((host_port[0], self.http_listener_port)) ++ ++ # send a Very Large PUSH request, expecting it to block at some point ++ ++ push_req_hdr = b'PUSH / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n' ++ count = self._write_until_full(client_sock, push_req_hdr, 1.0) ++ self.assertEqual(len(push_req_hdr), count) ++ ++ chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n' ++ last_chunk = b'0 \r\n\r\n' ++ count = 0 ++ deadline = time() + TIMEOUT ++ while deadline >= time(): ++ count = self._write_until_full(client_sock, chunk, 5.0) ++ if count < len(chunk): ++ break ++ self.assertFalse(time() > deadline, ++ "Client never blocked as expected!") ++ ++ # client should now be in Q2 block. Drain the server to unblock Q2 ++ _ = self._read_until_empty(server_sock, 2.0) ++ ++ # finish the PUSH ++ if count: ++ remainder = self._write_until_full(client_sock, chunk[count:], 1.0) ++ self.assertEqual(len(chunk), count + remainder) ++ ++ count = self._write_until_full(client_sock, last_chunk, 1.0) ++ self.assertEqual(len(last_chunk), count) ++ ++ # receive the request and reply ++ _ = self._read_until_empty(server_sock, 2.0) ++ ++ response = b'HTTP/1.1 201 Created\r\nContent-Length: 0\r\n\r\n' ++ count = self._write_until_full(server_sock, response, 1.0) ++ self.assertEqual(len(response), count) ++ ++ # complete the response read ++ _ = self._read_until_empty(client_sock, 2.0) ++ self.assertEqual(len(response), len(_)) ++ ++ client_sock.shutdown(socket.SHUT_RDWR) ++ client_sock.close() ++ ++ server_sock.shutdown(socket.SHUT_RDWR) ++ server_sock.close() ++ ++ server_listener.shutdown(socket.SHUT_RDWR) ++ server_listener.close() ++ ++ # search the router log file to verify Q2 was hit ++ ++ block_ct = 0 ++ unblock_ct = 0 ++ with io.open(self.INT_A.logfile_path) as f: ++ for line in f: ++ if 'client link blocked on Q2 limit' in line: ++ block_ct += 1 ++ if 'client link unblocked from Q2 limit' in line: ++ unblock_ct += 1 ++ self.assertTrue(block_ct > 0) ++ self.assertEqual(block_ct, unblock_ct) ++ ++ def test_02_backpressure_server(self): ++ """ ++ Trigger Q2 backpressure against the HTTP server. ++ """ ++ small_get_req = b'GET / HTTP/1.1\r\nContent-Length: 0\r\n\r\n' ++ ++ # create a listener socket to act as the server service ++ server_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ++ server_listener.settimeout(TIMEOUT) ++ server_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ++ server_listener.bind(('', self.http_server_port)) ++ server_listener.listen(1) ++ ++ # block until router connects ++ server_sock, host_port = server_listener.accept() ++ server_sock.settimeout(0.5) ++ server_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) ++ ++ # create a client connection to the router ++ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ++ client_sock.settimeout(0.5) ++ client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) ++ client_sock.connect((host_port[0], self.http_listener_port)) ++ ++ # send GET request - expect this to be successful ++ count = self._write_until_full(client_sock, small_get_req, 1.0) ++ self.assertEqual(len(small_get_req), count) ++ ++ request = self._read_until_empty(server_sock, 5.0) ++ self.assertEqual(len(small_get_req), len(request)) ++ ++ # send a Very Long response, expecting it to block at some point ++ chunk = b'8000\r\n' + b'X' * 0x8000 + b'\r\n' ++ last_chunk = b'0 \r\n\r\n' ++ response = b'HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n' ++ ++ count = self._write_until_full(server_sock, response, 1.0) ++ self.assertEqual(len(response), count) ++ ++ count = 0 ++ deadline = time() + TIMEOUT ++ while deadline >= time(): ++ count = self._write_until_full(server_sock, chunk, 5.0) ++ if count < len(chunk): ++ break ++ self.assertFalse(time() > deadline, ++ "Server never blocked as expected!") ++ ++ # server should now be in Q2 block. Drain the client to unblock Q2 ++ _ = self._read_until_empty(client_sock, 2.0) ++ ++ # finish the response ++ if count: ++ remainder = self._write_until_full(server_sock, chunk[count:], 1.0) ++ self.assertEqual(len(chunk), count + remainder) ++ ++ count = self._write_until_full(server_sock, last_chunk, 1.0) ++ self.assertEqual(len(last_chunk), count) ++ server_sock.shutdown(socket.SHUT_RDWR) ++ server_sock.close() ++ ++ _ = self._read_until_empty(client_sock, 1.0) ++ client_sock.shutdown(socket.SHUT_RDWR) ++ client_sock.close() ++ ++ server_listener.shutdown(socket.SHUT_RDWR) ++ server_listener.close() ++ ++ # search the router log file to verify Q2 was hit ++ ++ block_ct = 0 ++ unblock_ct = 0 ++ with io.open(self.INT_A.logfile_path) as f: ++ for line in f: ++ if 'server link blocked on Q2 limit' in line: ++ block_ct += 1 ++ if 'server link unblocked from Q2 limit' in line: ++ unblock_ct += 1 ++ self.assertTrue(block_ct > 0) ++ self.assertEqual(block_ct, unblock_ct) ++ ++ + if __name__ == '__main__': + unittest.main(main_module()) +-- +2.20.1 + +From 4816af35e8ffcfa3621301c97cc602558aa0d5d7 Mon Sep 17 00:00:00 2001 +From: Kenneth Giusti +Date: Tue, 16 Feb 2021 20:25:21 -0500 +Subject: [PATCH 10/16] DISPATCH-1961: flush any Q2 blocked buffers on conn + close + +This closes #1039 +--- + src/adaptors/http1/http1_server.c | 11 ++++++++--- + 1 file changed, 8 insertions(+), 3 deletions(-) + +diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c +index c1fb17b0..76c35cb7 100644 +--- a/src/adaptors/http1/http1_server.c ++++ b/src/adaptors/http1/http1_server.c +@@ -529,11 +529,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi + break; + } + case PN_RAW_CONNECTION_CLOSED_READ: { ++ if (hconn->q2_blocked) { ++ hconn->q2_blocked = false; ++ // drain any pending buffers blocked by Q2 ++ _handle_conn_read_event(hconn); ++ } + // notify the codec so it can complete the current response + // message (response body terminated on connection closed) + h1_codec_connection_rx_closed(hconn->http_conn); + pn_raw_connection_close(hconn->raw_conn); +- hconn->q2_blocked = false; + break; + } + +@@ -601,7 +605,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi + break; + } + case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { +- _handle_conn_need_read_buffers(hconn); ++ if (!hconn->q2_blocked) ++ _handle_conn_need_read_buffers(hconn); + break; + } + case PN_RAW_CONNECTION_WAKE: { +@@ -1136,7 +1141,7 @@ void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor, + + if (hconn->in_link_credit > 0) { + +- if (hconn->raw_conn) ++ if (hconn->raw_conn && !hconn->q2_blocked) + qda_raw_conn_grant_read_buffers(hconn->raw_conn); + + // check for pending responses that are blocked for credit +-- +2.20.1 + +From 67cf9a8d259052f2aed1c3b8a0fea5d1fde5248c Mon Sep 17 00:00:00 2001 +From: Ganesh Murthy +Date: Thu, 18 Feb 2021 14:27:27 -0500 +Subject: [PATCH 11/16] DISPATCH-1952: Add back qd_bitmask_t to leak supression + list + +--- + src/alloc_pool.c | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/src/alloc_pool.c b/src/alloc_pool.c +index 2ab65675..79c14a27 100644 +--- a/src/alloc_pool.c ++++ b/src/alloc_pool.c +@@ -102,6 +102,7 @@ static const char *leaking_types[] = { + "qdr_field_t", + "qdr_link_work_t", + "qd_buffer_t", ++ "qd_bitmask_t", + + "qd_parsed_field_t", // DISPATCH-1701 + "qdr_delivery_ref_t", // DISPATCH-1702 +-- +2.20.1 + +From c32f02c27d0471adf20630f830d819348cdc9611 Mon Sep 17 00:00:00 2001 +From: Ganesh Murthy +Date: Thu, 18 Feb 2021 14:07:52 -0500 +Subject: [PATCH 12/16] DISPATCH-1970: Zero out the conn->config pointer since + it has already been freed. This closes #1046. + +--- + src/adaptors/http2/http2_adaptor.c | 12 +++++------- + 1 file changed, 5 insertions(+), 7 deletions(-) + +diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c +index 25347e40..7ffdb0d9 100644 +--- a/src/adaptors/http2/http2_adaptor.c ++++ b/src/adaptors/http2/http2_adaptor.c +@@ -382,10 +382,6 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo + buff = DEQ_HEAD(http_conn->granted_read_buffs); + } + +- if (http_conn->delete_egress_connections) { +- http_conn->config = 0; +- } +- + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Freeing http2 connection in free_qdr_http2_connection", http_conn->conn_id); + + free_qdr_http2_connection_t(http_conn); +@@ -872,13 +868,13 @@ static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_ + remote_addr = conn->remote_address; + } + } else { +- remote_addr = conn->config->host; ++ remote_addr = conn->config?conn->config->host:0; + } + qd_http_record_request(http2_adaptor->core, + stream_data->method, + stream_data->request_status, +- conn->config->address, +- remote_addr, conn->config->site, ++ conn->config?conn->config->address:0, ++ remote_addr, conn->config?conn->config->site:0, + stream_data->remote_site, + conn->ingress, stream_data->bytes_in, stream_data->bytes_out, + stream_data->stop && stream_data->start ? stream_data->stop - stream_data->start : 0); +@@ -2094,6 +2090,8 @@ static void handle_disconnected(qdr_http2_connection_t* conn) + conn->stream_dispatcher_stream_data = 0; + + if (conn->delete_egress_connections) { ++ // The config has already been freed by the qd_http_connector_decref() function, set it to zero here ++ conn->config = 0; + close_connections(conn); + } + } +-- +2.20.1 + +From e4f2a9090d3fee36efeff903dc9a628ea77135ff Mon Sep 17 00:00:00 2001 +From: Chuck Rolke +Date: Mon, 22 Feb 2021 11:24:15 -0500 +Subject: [PATCH 13/16] DISPATCH-1968: Avoid proton calls on a closed raw + connections + + * Do not write new buffers if connection is CLOSED_WRITE + * Do not call connection_wake if CLOSED_READ or CLOSED_WRITE + +This closes #1047 +--- + src/adaptors/tcp_adaptor.c | 45 ++++++++++++++++++++++++++++++++++---- + 1 file changed, 41 insertions(+), 4 deletions(-) + +diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c +index 6cc8855f..69bed1ab 100644 +--- a/src/adaptors/tcp_adaptor.c ++++ b/src/adaptors/tcp_adaptor.c +@@ -56,6 +56,8 @@ struct qdr_tcp_connection_t { + bool egress_dispatcher; + bool connector_closed;//only used if egress_dispatcher=true + bool in_list; // This connection is in the adaptor's connections list ++ bool raw_closed_read; ++ bool raw_closed_write; + qdr_delivery_t *initial_delivery; + qd_timer_t *activate_timer; + qd_bridge_config_t config; +@@ -122,6 +124,9 @@ static void on_activate(void *context) + + static void grant_read_buffers(qdr_tcp_connection_t *conn) + { ++ if (conn->raw_closed_read) ++ return; ++ + pn_raw_buffer_t raw_buffers[READ_BUFFERS]; + // Give proactor more read buffers for the socket + if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { +@@ -231,6 +236,31 @@ static int handle_incoming(qdr_tcp_connection_t *conn) + return count; + } + ++ ++static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) ++{ ++ // Flush buffers staged for writing to raw conn ++ // and free possible references to stream data objects. ++ if (conn->outgoing_buff_count > 0) { ++ for (size_t i = conn->outgoing_buff_idx; ++ i < conn->outgoing_buff_idx + conn->outgoing_buff_count; ++ ++i) { ++ if (conn->outgoing_buffs[i].context) { ++ qd_message_stream_data_release( ++ (qd_message_stream_data_t*)conn->outgoing_buffs[i].context); ++ } ++ } ++ } ++ conn->outgoing_buff_count = 0; ++ ++ // Flush in-progress stream data object ++ if (conn->outgoing_stream_data) { ++ free_qd_message_stream_data_t(conn->outgoing_stream_data); ++ conn->outgoing_stream_data = 0; ++ } ++} ++ ++ + static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) + { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc); +@@ -240,9 +270,7 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) + if (tc->activate_timer) { + qd_timer_free(tc->activate_timer); + } +- if (tc->outgoing_stream_data) { +- free_qd_message_stream_data_t(tc->outgoing_stream_data); +- } ++ flush_outgoing_buffs(tc); + sys_mutex_free(tc->activation_lock); + //proactor will free the socket + free_qdr_tcp_connection_t(tc); +@@ -348,6 +376,7 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r + return used; + } + ++ + static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) + { + // Send the outgoing buffs to pn_raw_conn. +@@ -383,6 +412,12 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) + static void handle_outgoing(qdr_tcp_connection_t *conn) + { + if (conn->outstream) { ++ if (conn->raw_closed_write) { ++ // flush outgoing buffers and free attached stream_data objects ++ flush_outgoing_buffs(conn); ++ // give no more buffers to raw connection ++ return; ++ } + qd_message_t *msg = qdr_delivery_message(conn->outstream); + bool read_more_body = true; + +@@ -534,11 +569,13 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + } + case PN_RAW_CONNECTION_CLOSED_READ: { + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); ++ conn->raw_closed_read = true; + pn_raw_connection_close(conn->pn_raw_conn); + break; + } + case PN_RAW_CONNECTION_CLOSED_WRITE: { + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id); ++ conn->raw_closed_write = true; + pn_raw_connection_close(conn->pn_raw_conn); + break; + } +@@ -1186,7 +1223,7 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c) + if (context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; + sys_mutex_lock(conn->activation_lock); +- if (conn->pn_raw_conn) { ++ if (conn->pn_raw_conn && !(conn->raw_closed_read || conn->raw_closed_write)) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id); + pn_raw_connection_wake(conn->pn_raw_conn); + sys_mutex_unlock(conn->activation_lock); +-- +2.20.1 + +From 29911e6bb37d542ec662ae4af947e98c93619591 Mon Sep 17 00:00:00 2001 +From: Chuck Rolke +Date: Tue, 23 Feb 2021 16:48:05 -0500 +Subject: [PATCH 14/16] DISPATCH-1947: TCP Adaptor flow control + +This closes #1056 +--- + src/adaptors/tcp_adaptor.c | 88 +++++++++++++++++++++++++++++++++----- + 1 file changed, 78 insertions(+), 10 deletions(-) + +diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c +index 69bed1ab..0b831231 100644 +--- a/src/adaptors/tcp_adaptor.c ++++ b/src/adaptors/tcp_adaptor.c +@@ -17,6 +17,7 @@ + * under the License. + */ + ++#include "tcp_adaptor.h" + #include + #include + #include +@@ -26,7 +27,6 @@ + #include "qpid/dispatch/ctools.h" + #include "qpid/dispatch/protocol_adaptor.h" + #include "delivery.h" +-#include "tcp_adaptor.h" + #include + #include + +@@ -78,6 +78,9 @@ struct qdr_tcp_connection_t { + int outgoing_buff_count; // number of buffers with data + int outgoing_buff_idx; // first buffer with data + ++ sys_atomic_t q2_restart; // signal to resume receive ++ bool q2_blocked; // stop reading from raw conn ++ + DEQ_LINKS(qdr_tcp_connection_t); + }; + +@@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) + } + } + +-static int handle_incoming(qdr_tcp_connection_t *conn) ++ ++// Per-message callback to resume receiving after Q2 is unblocked on the ++// incoming link. ++// This routine must be thread safe: the thread on which it is running ++// is not an IO thread that owns the underlying pn_raw_conn. ++// ++void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) ++{ ++ qdr_tcp_connection_t *tc = (qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context); ++ if (tc == 0) { ++ // bad news. ++ assert(false); ++ return; ++ } ++ ++ // prevent the tc from being deleted while running: ++ sys_mutex_lock(tc->activation_lock); ++ ++ if (tc && tc->pn_raw_conn) { ++ sys_atomic_set(&tc->q2_restart, 1); ++ pn_raw_connection_wake(tc->pn_raw_conn); ++ } ++ ++ sys_mutex_unlock(tc->activation_lock); ++} ++ ++ ++// Fetch incoming raw incoming buffers from proton and pass them to ++// existing delivery or create a new delivery. ++// If close is pending then do not give more buffers to proton. ++static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) + { + // + // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit. +@@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn) + return 0; + } + ++ // ++ // Don't read from proton if in Q2 holdoff ++ // ++ if (conn->q2_blocked) { ++ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming q2_blocked", conn->conn_id); ++ return 0; ++ } ++ ++ // Read all buffers available from proton. ++ // Collect buffers for ingress; free empty buffers. + qd_buffer_list_t buffers; + DEQ_INIT(buffers); + pn_raw_buffer_t raw_buffers[READ_BUFFERS]; +@@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn) + } + } + } +- + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read buffers", conn->conn_id, DEQ_SIZE(buffers)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count); +- grant_read_buffers(conn); ++ ++ // Only grant more buffers to proton for reading if close is not pending ++ if (!close_pending) { ++ grant_read_buffers(conn); ++ } + + if (conn->instream) { +- // @TODO(kgiusti): handle Q2 block event: +- qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, 0); ++ qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, &conn->q2_blocked); ++ if (conn->q2_blocked) { ++ // note: unit tests grep for this log! ++ qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", conn->conn_id); ++ } + qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); + } else { +@@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn) + qd_message_compose_2(msg, props, false); + qd_compose_free(props); + ++ // set up message q2 unblocked callback handler ++ qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn); ++ qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler, conn_sp); ++ + conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count); + } +@@ -237,10 +290,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn) + } + + ++static int handle_incoming(qdr_tcp_connection_t *conn) ++{ ++ // Normal incoming runs with no close pending ++ return handle_incoming_impl(conn, false); ++} ++ + static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) + { + // Flush buffers staged for writing to raw conn +- // and free possible references to stream data objects. ++ // and release any references to stream data objects. + if (conn->outgoing_buff_count > 0) { + for (size_t i = conn->outgoing_buff_idx; + i < conn->outgoing_buff_idx + conn->outgoing_buff_count; +@@ -263,10 +322,10 @@ static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) + + static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) + { +- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc); + free(tc->reply_to); + free(tc->remote_address); + free(tc->global_id); ++ sys_atomic_destroy(&tc->q2_restart); + if (tc->activate_timer) { + qd_timer_free(tc->activate_timer); + } +@@ -278,7 +337,6 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) + + static void handle_disconnected(qdr_tcp_connection_t* conn) + { +- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_disconnected", conn->conn_id); + if (conn->instream) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id); + qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); +@@ -552,8 +610,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + switch (pn_event_type(e)) { + case PN_RAW_CONNECTION_CONNECTED: { + if (conn->ingress) { +- qdr_tcp_connection_ingress_accept(conn); + qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, conn->config.host_port, conn->remote_address, conn->global_id); ++ qdr_tcp_connection_ingress_accept(conn); + break; + } else { + conn->remote_address = get_address_string(conn->pn_raw_conn); +@@ -569,6 +627,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + } + case PN_RAW_CONNECTION_CLOSED_READ: { + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); ++ conn->q2_blocked = false; ++ handle_incoming_impl(conn, true); + conn->raw_closed_read = true; + pn_raw_connection_close(conn->pn_raw_conn); + break; +@@ -601,6 +661,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void + } + case PN_RAW_CONNECTION_WAKE: { + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id); ++ if (sys_atomic_set(&conn->q2_restart, 0)) { ++ // note: unit tests grep for this log! ++ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", conn->conn_id); ++ conn->q2_blocked = false; ++ handle_incoming(conn); ++ } + while (qdr_connection_process(conn->qdr_conn)) {} + break; + } +@@ -646,6 +712,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste + tc->context.handler = &handle_connection_event; + tc->config = listener->config; + tc->server = listener->server; ++ sys_atomic_init(&tc->q2_restart, 0); + tc->pn_raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(tc->pn_raw_conn, tc); + //the following call will cause a PN_RAW_CONNECTION_CONNECTED +@@ -739,6 +806,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi + tc->context.handler = &handle_connection_event; + tc->config = *config; + tc->server = server; ++ sys_atomic_init(&tc->q2_restart, 0); + tc->conn_id = qd_server_allocate_connection_id(tc->server); + + // +-- +2.20.1 + +From 29d04211d03b27049c59418a2d04c76fb60e5fa9 Mon Sep 17 00:00:00 2001 +From: Ganesh Murthy +Date: Tue, 2 Mar 2021 09:25:11 -0500 +Subject: [PATCH 15/16] DISPATCH-1984: Matched the method signature of + _client_rx_body_cb with the implementation. Modified uintmax_t len to size_t + len + +--- + src/adaptors/http1/http1_client.c | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c +index ff033807..422dd6ac 100644 +--- a/src/adaptors/http1/http1_client.c ++++ b/src/adaptors/http1/http1_client.c +@@ -106,7 +106,7 @@ static int _client_rx_response_cb(h1_codec_request_state_t *lib_rs, + uint32_t version_minor); + static int _client_rx_header_cb(h1_codec_request_state_t *lib_rs, const char *key, const char *value); + static int _client_rx_headers_done_cb(h1_codec_request_state_t *lib_rs, bool has_body); +-static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, uintmax_t len, bool more); ++static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, size_t len, bool more); + static void _client_rx_done_cb(h1_codec_request_state_t *lib_rs); + static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled); + static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context); +-- +2.20.1 + +From 6bb2eb534f9243caeb00891821be8f0d133e4822 Mon Sep 17 00:00:00 2001 +From: Ganesh Murthy +Date: Thu, 4 Mar 2021 12:42:00 -0500 +Subject: [PATCH 16/16] DISPATCH-1988: Added code to start routing a delivery + only if there is some data in the content->buffers or in content->pending. If + we never receive any data and the receive is complete, the delivery is + rejected. This closes #1062 + +--- + include/qpid/dispatch/message.h | 8 ++ + src/message.c | 20 ++++- + src/router_node.c | 36 ++++++++ + tests/system_tests_link_routes.py | 145 ++++++++++++++++++++++++++++++ + 4 files changed, 207 insertions(+), 2 deletions(-) + +diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h +index 07e144ad..1558dde3 100644 +--- a/include/qpid/dispatch/message.h ++++ b/include/qpid/dispatch/message.h +@@ -247,6 +247,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery); + */ + qd_message_t * qd_get_message_context(pn_delivery_t *delivery); + ++/** ++ * Returns true if there is at least one non-empty buffer at the head of the content->buffers list ++ * or if the content->pending buffer is non-empty. ++ * ++ * @param msg A pointer to a message. ++ */ ++bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg); ++ + /** + * Send the message outbound on an outgoing link. + * +diff --git a/src/message.c b/src/message.c +index 16794075..3508a996 100644 +--- a/src/message.c ++++ b/src/message.c +@@ -1255,8 +1255,6 @@ void qd_message_add_fanout(qd_message_t *in_msg, + // DISPATCH-1590: content->buffers may not be set up yet if + // content->pending is the first buffer and it is not yet full. + if (!buf) { +- // assumption: proton will never signal a readable delivery if there is +- // no data at all. + assert(content->pending && qd_buffer_size(content->pending) > 0); + DEQ_INSERT_TAIL(content->buffers, content->pending); + content->pending = 0; +@@ -1422,6 +1420,24 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery) + return 0; + } + ++bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg) ++{ ++ if (!msg) ++ return false; ++ ++ if (MSG_CONTENT(msg)) { ++ if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) { ++ qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers); ++ if (buf && qd_buffer_size(buf) > 0) ++ return true; ++ } ++ if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0) ++ return true; ++ } ++ ++ return false; ++} ++ + + qd_message_t *qd_message_receive(pn_delivery_t *delivery) + { +diff --git a/src/router_node.c b/src/router_node.c +index 70d98f91..bccf6cfd 100644 +--- a/src/router_node.c ++++ b/src/router_node.c +@@ -451,6 +451,42 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) + qd_message_t *msg = qd_message_receive(pnd); + bool receive_complete = qd_message_receive_complete(msg); + ++ // ++ // The very first time AMQP_rx_handler is called on a PN_DELIVERY event, it calls qd_message_receive(). When qd_message_receive() returns, we check here if ++ // there are any data in the content buffers. If there is no content in the buffers, there is no reason to route the delivery. We will wait for some data ++ // in the buffers before we start to route the delivery. ++ // Notice that the if statement checks for the existence of a delivery (qdr_delivery_t). Existence of a delivery means that the delivery has been routed when ++ // there was data in the buffers (When a delivery has been routed successfully, the delivery (qdr_delivery_t) will be non null) ++ // ++ // The following if statement will deal with the following cases:- ++ // 1. We receive one empty transfer frame with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false ++ // In this case, there is no data at all in the message content buffers, we will reject the message when receive_complete=true. We will never route this ++ // delivery, so core thread will not be involved ++ // 2. We receive 2 or more empty transfer frames with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false ++ // This case is similar to #1. We have no content in any of the buffers, we will reject this message after receive_complete=true. We will never route this ++ // delivery, so core thread will not be involved ++ // 3. Exactly one empty transfer frame with more=false and abort=false ++ // In this case, again there is still no content in any of the buffers, we will reject this message. Again, we will not route this message, so the core thread is not involved. ++ // ++ if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) { ++ if (receive_complete) { ++ // There is no qdr_delivery_t (delivery) yet which means this message has not been routed yet (the first run of this function is not complete yet) and ++ // the message is fully received (receive_complete=true) but there is no content in the message buffers. ++ // This is only possible if there were one or more empty transfer frames. ++ // Since there is nothing in the message, we will reject it (AMQP message must have a non empty message body) ++ pn_link_flow(pn_link, 1); ++ if (pn_delivery_aborted(pnd)) ++ qd_message_set_discard(msg, true); ++ pn_delivery_update(pnd, PN_REJECTED); ++ pn_delivery_settle(pnd); ++ // qd_message_free will free all the associated content buffers and also the content->pending buffer ++ qd_message_free(msg); ++ qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to empty message"); ++ } ++ ++ return false; ++ } ++ + if (!qd_message_oversize(msg)) { + // message not rejected as oversize + if (receive_complete) { +diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py +index 1334618d..80ce0cdd 100644 +--- a/tests/system_tests_link_routes.py ++++ b/tests/system_tests_link_routes.py +@@ -26,6 +26,7 @@ import os + from time import sleep, time + from threading import Event + from subprocess import PIPE, STDOUT ++import socket + + from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process, TestTimeout, \ + AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager +@@ -1761,6 +1762,150 @@ class LinkRouteDrainTest(TestCase): + self.assertEquals(drain_receiver.error, None) + + ++class EmptyTransferTest(TestCase): ++ @classmethod ++ def setUpClass(cls): ++ super(EmptyTransferTest, cls).setUpClass() ++ cls.ROUTER_LISTEN_PORT = cls.tester.get_port() ++ ++ config = [ ++ ('router', {'mode': 'standalone', 'id': 'QDR.A'}), ++ # the client will connect to this listener ++ ('listener', {'role': 'normal', ++ 'host': '0.0.0.0', ++ 'port': cls.ROUTER_LISTEN_PORT, ++ 'saslMechanisms': 'ANONYMOUS'}), ++ # to connect to the fake broker ++ ('connector', {'name': 'broker', ++ 'role': 'route-container', ++ 'host': '127.0.0.1', ++ 'port': cls.tester.get_port(), ++ 'saslMechanisms': 'ANONYMOUS'}), ++ ('linkRoute', ++ {'prefix': 'examples', 'containerId': 'FakeBroker', ++ 'direction': 'in'}), ++ ('linkRoute', ++ {'prefix': 'examples', 'containerId': 'FakeBroker', ++ 'direction': 'out'}) ++ ] ++ config = Qdrouterd.Config(config) ++ cls.router = cls.tester.qdrouterd('A', config, wait=False) ++ ++ def _fake_broker(self, cls): ++ """ ++ Spawn a fake broker listening on the broker's connector ++ """ ++ fake_broker = cls(self.router.connector_addresses[0]) ++ # wait until the connection to the fake broker activates ++ self.router.wait_connectors() ++ return fake_broker ++ ++ def test_DISPATCH_1988(self): ++ fake_broker = self._fake_broker(FakeBroker) ++ AMQP_OPEN_BEGIN_ATTACH = bytearray( ++ b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00' ++ b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06' ++ b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21' ++ b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00' ++ b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00' ++ b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b' ++ b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72' ++ b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b' ++ b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0' ++ b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70' ++ b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00') ++ ++ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ++ # Connect to the router listening port and send an amqp, open, ++ # begin, attach. The attach is sent on the link ++ # routed address, "examples" ++ s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT)) ++ s.sendall(AMQP_OPEN_BEGIN_ATTACH) ++ ++ # Give a second for the attach to propagate to the broker and ++ # for the broker to send a response attach ++ sleep(1) ++ data = s.recv(2048) ++ self.assertIn("examples", repr(data)) ++ ++ # First send a message on link routed address "examples" with ++ # message body of "message 0" ++ # Verify the the sent message has been accepted. ++ TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' ++ + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01' ++ + b'\xa0\x01\x01\x43\x42' ++ + b'\x40\x40\x40\x40\x40\x42\x00\x53' ++ + b'\x73\xc0\x02\x01\x44\x00\x53\x77' ++ + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' ++ + b'\x65\x20\x30') ++ s.sendall(TRANSFER_1) ++ sleep(0.5) ++ data = s.recv(1024) ++ # The delivery has been accepted. ++ self.assertIn("x00S$E", repr(data)) ++ ++ # Test case 1 ++ # Send an empty transfer frame to the router and you should ++ # receive a rejected disposition from the router. ++ # Without the fix for DISPATCH_1988, ++ # upon sending this EMPTY_TRANSFER ++ # the router crashes with the following assert ++ # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed. ++ # This is the empty transfer frame that is sent to the router. ++ # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false] ++ EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00' ++ + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52' ++ + b'\x02\xa0\x01\x02\x43\x42' ++ + b'\x42\x40\x40\x40\x40\x42') ++ s.sendall(EMPTY_TRANSFER) ++ sleep(1) ++ data = s.recv(1024) ++ # The delivery has been rejected. ++ self.assertIn("x00S%E", repr(data)) ++ ++ # Let's send another transfer to make sure that the ++ # router has not crashed. ++ TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00' ++ + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03' ++ + b'\xa0\x01\x03\x43\x42' ++ + b'\x40\x40\x40\x40\x40\x42\x00\x53' ++ + b'\x73\xc0\x02\x01\x44\x00\x53\x77' ++ + b'\xa1\x09\x6d\x65\x73\x73\x61\x67' ++ + b'\x65\x20\x30') ++ s.sendall(TRANSFER_1) ++ sleep(0.5) ++ data = s.recv(1024) ++ # The delivery has been accepted. ++ self.assertIn("x00S$E", repr(data)) ++ ++ # Test case 2 ++ # Now, send two empty transfer frames, first transfer has ++ # more=true and the next transfer has more=false. ++ # This will again be rejected by the router. ++ # The following are the two transfer frames that will be ++ # sent to the router. ++ #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = true, batchable = false] ++ #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = false, more = false, batchable = false] ++ EMPTY_TRANSFER_MORE_TRUE = bytearray( ++ b'\x00\x00\x00\x1c\x02\x00\x00\x00' ++ + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' ++ + b'\xa0\x01\x04\x43\x42' ++ + b'\x41\x40\x40\x40\x40\x42') ++ EMPTY_TRANSFER_MORE_FALSE = bytearray( ++ b'\x00\x00\x00\x1c\x02\x00\x00\x00' ++ + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04' ++ + b'\xa0\x01\x04\x43\x42' ++ + b'\x42\x40\x40\x40\x40\x42') ++ s.sendall(EMPTY_TRANSFER_MORE_TRUE) ++ s.sendall(EMPTY_TRANSFER_MORE_FALSE) ++ sleep(0.5) ++ data = s.recv(1024) ++ # The delivery has been rejected. ++ self.assertIn("x00S%E", repr(data)) ++ ++ s.close() ++ ++ + class ConnectionLinkRouteTest(TestCase): + """ + Test connection scoped link route implementation +-- +2.20.1 + diff --git a/qpid-dispatch.spec b/qpid-dispatch.spec index 5227c69..6b03e5b 100644 --- a/qpid-dispatch.spec +++ b/qpid-dispatch.spec @@ -30,7 +30,7 @@ %endif ExcludeArch: i686 -ExcludeArch: armv7hl +#ExcludeArch: armv7hl %global proton_minimum_version 0.33.0 %global libwebsockets_minimum_version 3.2.0 @@ -61,7 +61,7 @@ ExcludeArch: i686 Source1: docs-%{version}-1.tar.gz %endif -#Patch1: dispatch.patch +Patch1: dispatch.patch Patch4: console-listener.patch BuildRequires: gcc @@ -221,7 +221,7 @@ Requires: %{pythonx}-qpid-proton >= %{proton_minimum_version} %prep %setup -q -#%patch1 -p1 +%patch1 -p1 %patch4 -p1 mkdir pre_built