Skip to content

Commit

Permalink
Thread pool (oceanbase#332)
Browse files Browse the repository at this point in the history
### What problem were solved in this pull request?

Issue Number: close oceanbase#322 oceanbase#301 oceanbase#143 oceanbase#47 
ref oceanbase#138 

Problem:
当前使用的seda处理框架和libevent消息处理模式不适用于SQL处理模型。

SQL
请求处理模式是来一个请求应答后才能接收新的请求。但是当前收到某个连接的消息后在返回应答之前,可以接收新的请求,导致可以同时处理同一个连接的多个请求,出现并发错误。

另外,同一个SQL请求,需要有一个统一的处理上下文,不能在线程间切来切去。而seda将处理请求分为多个stage,会在不同的线程间切换。

### What is changed and how it works?
新增线程模型概念,用来处理连接上的请求。设计文档参考 miniob-thread-model.md。

对于新连接监听,放在主线程来做。接收到新连接时,交给线程模型来处理,可以是一个连接一个线程,也可以是一个线程池统一管理。
不管使用哪种模型,在处理一个SQL结束之前,不会去监听该连接上是否有新的消息到达,因此也不会出现一个连接上多个请求同时处理的情况。

### Other information
本设计参考了Java的线程池和Percona/MariaDB的线程池(虽然没有实现)设计
  • Loading branch information
hnwyllmm authored Jan 22, 2024
1 parent 75f9cf8 commit 94c39e8
Show file tree
Hide file tree
Showing 80 changed files with 2,591 additions and 4,980 deletions.
83 changes: 52 additions & 31 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,55 @@ jobs:
# sysbench cannot work property on this platform.
# I found that sysbench would send more request before receiving last response
# sysbench-test:
# runs-on: ubuntu-latest


# steps:
# - name: Checkout repository and submodules
# uses: actions/checkout@v2

# - name: install sysbench and mariadb-client
# shell: bash
# run: |
# curl -s https://packagecloud.io/install/repositories/akopytov/sysbench/script.deb.sh -o script.deb.sh
# sudo bash script.deb.sh
# sudo apt -y install sysbench mariadb-client

# - name: start server
# shell: bash
# run: |
# sudo bash build.sh init
# bash build.sh -DCONCURRENCY=ON -DWITH_UNIT_TESTS=OFF
# nohup ./build_debug/bin/observer -s /tmp/miniob.sock -f etc/observer.ini -P mysql -t mvcc &
# sleep 10 && echo "wake up"
# mysql --version
# mysql -S /tmp/miniob.sock -e "show tables"

# - name: sysbench test
# shell: bash
# run: |
# cd test/sysbench
# sysbench --mysql-socket=/tmp/miniob.sock --threads=10 miniob_insert prepare
# sysbench --mysql-socket=/tmp/miniob.sock --threads=10 miniob_insert run
sysbench-test:
strategy:
matrix:
thread_model: ['one-thread-per-connection', 'java-thread-pool']
test_case: ['miniob_insert', 'miniob_delete', 'miniob_select']

runs-on: ubuntu-latest
steps:
- name: Checkout repository and submodules
uses: actions/checkout@v2

- name: install sysbench and mariadb-client
shell: bash
run: |
curl -s https://packagecloud.io/install/repositories/akopytov/sysbench/script.deb.sh -o script.deb.sh
sudo bash script.deb.sh
sudo apt -y install sysbench mariadb-client
- name: start server
shell: bash
run: |
sudo bash build.sh init
bash build.sh release -DCONCURRENCY=ON -DWITH_UNIT_TESTS=OFF
nohup ./build_release/bin/observer -T ${{ matrix.thread_model }} -s /tmp/miniob.sock -f etc/observer.ini -P mysql -t mvcc &
sleep 10 && echo "wake up"
mysql --version
mysql -S /tmp/miniob.sock -e "show tables"
- name: sysbench test
shell: bash
run: |
cd test/sysbench
sysbench --mysql-socket=/tmp/miniob.sock --threads=10 ${{ matrix.test_case }} prepare
sysbench --mysql-socket=/tmp/miniob.sock --threads=10 ${{ matrix.test_case }} run
benchmark-test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository and submodules
uses: actions/checkout@v2

- name: build observer and benchmark
shell: bash
run: |
sudo bash build.sh init
bash build.sh release -DCONCURRENCY=ON -DWITH_UNIT_TESTS=OFF -DWITH_BENCHMARK=ON
- name: testing
shell: bash
run: |
cd build_release/bin/
for file in `find ./ -name "*_concurrency_test" -executable`; do $file; if [ $? -ne 0 ]; then exit 1; fi; done
36 changes: 22 additions & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ OPTION(ENABLE_ASAN "Enable build with address sanitizer" ON)
OPTION(ENABLE_TSAN "Build with thread sanitizer" OFF)
OPTION(ENABLE_UBSAN "Build with undefined behavior sanitizer" OFF)
OPTION(WITH_UNIT_TESTS "Compile miniob with unit tests" ON)
OPTION(WITH_BENCHMARK "Compile benchmark" OFF)
OPTION(ENABLE_COVERAGE "Enable unittest coverage" OFF)
OPTION(CONCURRENCY "Support concurrency operations" OFF)
OPTION(STATIC_STDLIB "Link std library static or dynamic, such as libgcc, libstdc++, libasan" OFF)

Expand Down Expand Up @@ -63,6 +65,7 @@ IF ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU" AND ${STATIC_STDLIB})
ENDIF()

IF (ENABLE_ASAN)
MESSAGE(STATUS "Instrumenting with Address Sanitizer")
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -fno-omit-frame-pointer -fsanitize=address -fsanitize-address-use-after-scope")
IF ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU" AND ${STATIC_STDLIB})
ADD_LINK_OPTIONS(-static-libasan)
Expand All @@ -87,18 +90,18 @@ IF (ENABLE_TSAN)
ENDIF (ENABLE_TSAN)

IF (ENABLE_UBSAN)
# Only success on Mac Clang
MESSAGE(STATUS "Instrumenting with Undefined Behavior Sanitizer")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fno-omit-frame-pointer")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=undefined")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=implicit-conversion")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=implicit-integer-truncation")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=integer")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=nullability")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=vptr")
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} ${UBSAN_FLAGS}")
SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${UBSAN_FLAGS}")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${UBSAN_FLAGS}")
# Only success on Mac Clang
MESSAGE(STATUS "Instrumenting with Undefined Behavior Sanitizer")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fno-omit-frame-pointer")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=undefined")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=implicit-conversion")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=implicit-integer-truncation")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=integer")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=nullability")
SET(UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize=vptr")
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} ${UBSAN_FLAGS}")
SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${UBSAN_FLAGS}")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${UBSAN_FLAGS}")
ENDIF (ENABLE_UBSAN)

IF (CMAKE_INSTALL_PREFIX)
Expand Down Expand Up @@ -127,7 +130,9 @@ ENDIF ()
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/deps /usr/local/include)

IF(WITH_UNIT_TESTS)
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -fprofile-arcs -ftest-coverage")
IF (ENABLE_COVERAGE)
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -fprofile-arcs -ftest-coverage")
ENDIF (ENABLE_COVERAGE)
enable_testing()
ENDIF(WITH_UNIT_TESTS)

Expand All @@ -140,9 +145,12 @@ ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src/obclient)
ADD_SUBDIRECTORY(src/observer)
ADD_SUBDIRECTORY(test/perf)
ADD_SUBDIRECTORY(benchmark)
ADD_SUBDIRECTORY(tools)

IF (WITH_BENCHMARK)
ADD_SUBDIRECTORY(benchmark)
ENDIF(WITH_BENCHMARK)

IF(WITH_UNIT_TESTS)
ADD_SUBDIRECTORY(unittest)
ENDIF(WITH_UNIT_TESTS)
Expand Down
2 changes: 1 addition & 1 deletion benchmark/bplus_tree_concurrency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details. */
#include <stdexcept>

#include "common/log/log.h"
#include "integer_generator.h"
#include "common/math/integer_generator.h"
#include "storage/buffer/disk_buffer_pool.h"
#include "storage/index/bplus_tree.h"

Expand Down
29 changes: 19 additions & 10 deletions benchmark/record_manager_concurrency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ See the Mulan PSL v2 for more details. */
#include <stdexcept>

#include "common/log/log.h"
#include "integer_generator.h"
#include "common/math/integer_generator.h"
#include "storage/buffer/disk_buffer_pool.h"
#include "storage/common/condition_filter.h"
#include "storage/record/record_manager.h"
Expand Down Expand Up @@ -87,7 +87,7 @@ class BenchmarkBase : public Fixture

string log_name = this->Name() + ".log";
string record_filename = this->record_filename();
LoggerFactory::init_default(log_name.c_str(), LOG_LEVEL_TRACE);
LoggerFactory::init_default(log_name.c_str(), LOG_LEVEL_INFO);

std::call_once(init_bpm_flag, []() { BufferPoolManager::set_instance(&bpm); });

Expand All @@ -110,8 +110,8 @@ class BenchmarkBase : public Fixture
LOG_WARN("failed to init record file handler. rc=%s", strrc(rc));
throw runtime_error("failed to init record file handler");
}
LOG_INFO(
"test %s setup done. threads=%d, thread index=%d", this->Name().c_str(), state.threads(), state.thread_index());
LOG_INFO("test %s setup done. threads=%d, thread index=%d",
this->Name().c_str(), state.threads(), state.thread_index());
}

virtual void TearDown(const State &state)
Expand Down Expand Up @@ -156,9 +156,9 @@ class BenchmarkBase : public Fixture

uint32_t GetRangeMax(const State &state) const
{
uint32_t max = static_cast<uint32_t>(state.range(0) * 3);
int32_t max = static_cast<int32_t>(state.range(0) * 3);
if (max <= 0) {
max = (1 << 31);
max = INT32_MAX - 1;
}
return max;
}
Expand Down Expand Up @@ -261,24 +261,33 @@ class DeletionBenchmark : public BenchmarkBase

void SetUp(const State &state) override
{
BenchmarkBase::SetUp(state);

if (0 != state.thread_index()) {
while (!setup_done_) {
this_thread::sleep_for(chrono::milliseconds(100));
}
return;
}

BenchmarkBase::SetUp(state);

uint32_t max = GetRangeMax(state);
ASSERT(max > 0, "invalid argument count. %ld", state.range(0));
FillUp(0, max, rids_);
ASSERT(rids_.size() > 0, "invalid argument count. %ld", rids_.size());
setup_done_ = true;
}

protected:
vector<RID> rids_;
// 从实际测试情况看,每个线程都会执行setup,但是它们操作的对象都是同一个
// 但是每个线程set up结束后,就会执行测试了。如果不等待的话,就会导致有些
// 线程访问的数据不是想要的结果
volatile bool setup_done_ = false;
vector<RID> rids_;
};

BENCHMARK_DEFINE_F(DeletionBenchmark, Deletion)(State &state)
{
IntegerGenerator generator(0, static_cast<int>(rids_.size()));
IntegerGenerator generator(0, static_cast<int>(rids_.size() - 1));
Stat stat;

for (auto _ : state) {
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ function do_init
cd ${TOPDIR}/deps/3rd/benchmark && \
mkdir -p build && \
cd build && \
${CMAKE_COMMAND} .. -DBENCHMARK_ENABLE_TESTING=OFF -DBENCHMARK_INSTALL_DOCS=OFF -DBENCHMARK_ENABLE_GTEST_TESTS=OFF -DBENCHMARK_USE_BUNDLED_GTEST=OFF -DBENCHMARK_ENABLE_ASSEMBLY_TESTS=OFF && \
${CMAKE_COMMAND} .. -DCMAKE_BUILD_TYPE=RelWithDebInfo -DBENCHMARK_ENABLE_TESTING=OFF -DBENCHMARK_INSTALL_DOCS=OFF -DBENCHMARK_ENABLE_GTEST_TESTS=OFF -DBENCHMARK_USE_BUNDLED_GTEST=OFF -DBENCHMARK_ENABLE_ASSEMBLY_TESTS=OFF && \
${MAKE_COMMAND} -j4 && \
${MAKE_COMMAND} install

Expand Down
9 changes: 9 additions & 0 deletions deps/common/log/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ int Log::out(const LOG_LEVEL console_level, const LOG_LEVEL log_level, T &msg)

#endif // ASSERT

#ifndef TRACE
#ifdef DEBUG
#define TRACE(format, ...) LOG_TRACE(format, ##__VA_ARGS__)
#else // DEBUG
#define TRACE(...)
#endif // DEBUG

#endif // TRACE

#define SYS_OUTPUT_FILE_POS ", File:" << __FILE__ << ", line:" << __LINE__ << ",function:" << __FUNCTION__
#define SYS_OUTPUT_ERROR ",error:" << errno << ":" << strerror(errno)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,29 @@ See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/05/04
//

#pragma once

#include <random>

namespace common {

class IntegerGenerator
{
public:
IntegerGenerator(int min, int max) : distrib_(min, max) {}

IntegerGenerator(const IntegerGenerator &other) = delete;
IntegerGenerator(IntegerGenerator &&) = delete;
IntegerGenerator &operator=(const IntegerGenerator &) = delete;

int next() { return distrib_(rd_); }
int min() const { return distrib_.min(); }
int max() const { return distrib_.max(); }

private:
std::random_device rd_;
std::uniform_int_distribution<> distrib_;
};
};

} // namespace common
10 changes: 10 additions & 0 deletions deps/common/os/process_param.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ class ProcessParam
}
const std::string &trx_kit_name() const { return trx_kit_name_; }

void set_thread_handling_name(const char *thread_handling_name)
{
if (thread_handling_name) {
thread_handling_name_ = thread_handling_name;
}
}

const std::string &thread_handling_name() const { return thread_handling_name_; }

void set_buffer_pool_memory_size(int bytes) { buffer_pool_memory_size_ = bytes; }

int buffer_pool_memory_size() const { return buffer_pool_memory_size_; }
Expand All @@ -87,6 +96,7 @@ class ProcessParam
std::string unix_socket_path_;
std::string protocol_;
std::string trx_kit_name_;
std::string thread_handling_name_;
int buffer_pool_memory_size_ = -1;
};

Expand Down
22 changes: 11 additions & 11 deletions deps/common/os/signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ See the Mulan PSL v2 for more details. */
#include "pthread.h"
namespace common {

void setSignalHandler(int sig, sighandler_t func)
void set_signal_handler(int sig, sighandler_t func)
{
struct sigaction newsa, oldsa;
sigemptyset(&newsa.sa_mask);
Expand All @@ -33,16 +33,16 @@ void setSignalHandler(int sig, sighandler_t func)
/*
** Set Singal handling Fucntion
*/
void setSignalHandler(sighandler_t func)
void set_signal_handler(sighandler_t func)
{
setSignalHandler(SIGQUIT, func);
setSignalHandler(SIGINT, func);
setSignalHandler(SIGHUP, func);
setSignalHandler(SIGTERM, func);
set_signal_handler(SIGQUIT, func);
set_signal_handler(SIGINT, func);
set_signal_handler(SIGHUP, func);
set_signal_handler(SIGTERM, func);
signal(SIGPIPE, SIG_IGN);
}

void blockDefaultSignals(sigset_t *signal_set, sigset_t *old_set)
void block_default_signals(sigset_t *signal_set, sigset_t *old_set)
{
sigemptyset(signal_set);
#ifndef DEBUG
Expand All @@ -54,7 +54,7 @@ void blockDefaultSignals(sigset_t *signal_set, sigset_t *old_set)
pthread_sigmask(SIG_BLOCK, signal_set, old_set);
}

void unBlockDefaultSignals(sigset_t *signal_set, sigset_t *old_set)
void unblock_default_signals(sigset_t *signal_set, sigset_t *old_set)
{
sigemptyset(signal_set);
#ifndef DEBUG
Expand All @@ -65,7 +65,7 @@ void unBlockDefaultSignals(sigset_t *signal_set, sigset_t *old_set)
pthread_sigmask(SIG_UNBLOCK, signal_set, old_set);
}

void *waitForSignals(void *args)
void *wait_for_signals(void *args)
{
LOG_INFO("Start thread to wait signals.");
sigset_t *signal_set = (sigset_t *)args;
Expand All @@ -81,7 +81,7 @@ void *waitForSignals(void *args)
return NULL;
}

void startWaitForSignals(sigset_t *signal_set)
void start_wait_for_signals(sigset_t *signal_set)
{
pthread_t pThread;
pthread_attr_t pThreadAttrs;
Expand All @@ -90,6 +90,6 @@ void startWaitForSignals(sigset_t *signal_set)
pthread_attr_init(&pThreadAttrs);
pthread_attr_setdetachstate(&pThreadAttrs, PTHREAD_CREATE_DETACHED);

pthread_create(&pThread, &pThreadAttrs, waitForSignals, (void *)signal_set);
pthread_create(&pThread, &pThreadAttrs, wait_for_signals, (void *)signal_set);
}
} // namespace common
Loading

0 comments on commit 94c39e8

Please sign in to comment.