Skip to content

Commit 329d57f

Browse files
[regression](move-memtable) test LoadStream on_idle_timeout (apache#29354)
Signed-off-by: freemandealer <[email protected]>
1 parent 28ff349 commit 329d57f

File tree

5 files changed

+66
-5
lines changed

5 files changed

+66
-5
lines changed

be/src/common/config.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
777777
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
778778

779779
// idle timeout for load stream in ms
780-
DEFINE_Int64(load_stream_idle_timeout_ms, "600000");
780+
DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
781781
// brpc streaming max_buf_size in bytes
782782
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
783783
// brpc streaming messages_in_batch

be/src/runtime/load_stream.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data)
454454
IndexStreamSharedPtr index_stream;
455455

456456
int64_t index_id = header.index_id();
457-
DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid",
457+
DBUG_EXECUTE_IF("TabletStream._append_data.unknown_indexid",
458458
{ index_id = UNKNOWN_ID_FOR_TEST; });
459459
auto it = _index_streams_map.find(index_id);
460460
if (it == _index_streams_map.end()) {

be/src/service/internal_service.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,7 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
391391
}
392392

393393
stream_options.handler = load_stream.get();
394-
// TODO : set idle timeout
395-
// stream_options.idle_timeout_ms =
394+
stream_options.idle_timeout_ms = config::load_stream_idle_timeout_ms;
396395

397396
StreamId streamid;
398397
if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {

be/src/vec/sink/load_stream_stub.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "olap/rowset/rowset_writer.h"
2323
#include "util/brpc_client_cache.h"
24+
#include "util/debug_points.h"
2425
#include "util/network_util.h"
2526
#include "util/thrift_util.h"
2627
#include "util/uid_util.h"
@@ -330,6 +331,10 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
330331
int ret;
331332
{
332333
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
334+
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.delay_before_send", {
335+
int64_t delay_ms = dp->param<int64>("delay_ms", 1000);
336+
bthread_usleep(delay_ms * 1000);
337+
});
333338
ret = brpc::StreamWrite(_stream_id, buf);
334339
}
335340
DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; });

regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy

+58-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,52 @@ suite("load_stream_fault_injection", "nonConcurrent") {
6767
file "baseall.txt"
6868
}
6969

70+
def backendId_to_backendIP = [:]
71+
def backendId_to_backendHttpPort = [:]
72+
def backendId_to_params = [string:[:]]
73+
74+
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
75+
76+
def set_be_param = { paramName, paramValue ->
77+
// for eache be node, set paramName=paramValue
78+
for (String id in backendId_to_backendIP.keySet()) {
79+
def beIp = backendId_to_backendIP.get(id)
80+
def bePort = backendId_to_backendHttpPort.get(id)
81+
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
82+
assertTrue(out.contains("OK"))
83+
}
84+
}
85+
86+
def reset_be_param = { paramName ->
87+
// for eache be node, reset paramName to default
88+
for (String id in backendId_to_backendIP.keySet()) {
89+
def beIp = backendId_to_backendIP.get(id)
90+
def bePort = backendId_to_backendHttpPort.get(id)
91+
def original_value = backendId_to_params.get(id).get(paramName)
92+
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value))
93+
assertTrue(out.contains("OK"))
94+
}
95+
}
96+
97+
def get_be_param = { paramName ->
98+
// for eache be node, get param value by default
99+
def paramValue = ""
100+
for (String id in backendId_to_backendIP.keySet()) {
101+
def beIp = backendId_to_backendIP.get(id)
102+
def bePort = backendId_to_backendHttpPort.get(id)
103+
// get the config value from be
104+
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName))
105+
assertTrue(code == 0)
106+
assertTrue(out.contains(paramName))
107+
// parsing
108+
def resultList = parseJson(out)[0]
109+
assertTrue(resultList.size() == 4)
110+
// get original value
111+
paramValue = resultList[2]
112+
backendId_to_params.get(id, [:]).put(paramName, paramValue)
113+
}
114+
}
115+
70116
def load_with_injection = { injection, expect_errmsg ->
71117
try {
72118
GetDebugPoint().enableDebugPointForAllBEs(injection)
@@ -110,10 +156,21 @@ suite("load_stream_fault_injection", "nonConcurrent") {
110156
// LoadStream add_segment meet unknown segid in request header
111157
load_with_injection("TabletStream.add_segment.unknown_segid", "")
112158
// LoadStream append_data meet unknown index id in request header
113-
load_with_injection("abletStream.add_segment.unknown_indexid", "")
159+
load_with_injection("TabletStream._append_data.unknown_indexid", "")
114160
// LoadStream dispatch meet unknown load id
115161
load_with_injection("LoadStream._dispatch.unknown_loadid", "")
116162
// LoadStream dispatch meet unknown src id
117163
load_with_injection("LoadStream._dispatch.unknown_srcid", "")
164+
165+
// LoadStream meets StreamRPC idle timeout
166+
get_be_param("load_stream_idle_timeout_ms")
167+
set_be_param("load_stream_idle_timeout_ms", 500)
168+
try {
169+
load_with_injection("LoadStreamStub._send_with_retry.delay_before_send", "")
170+
} catch(Exception e) {
171+
logger.info(e.getMessage())
172+
} finally {
173+
reset_be_param("load_stream_idle_timeout_ms")
174+
}
118175
}
119176

0 commit comments

Comments
 (0)