Skip to content

Commit

Permalink
Merge branch 'master' into scanner_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring authored Jan 3, 2024
2 parents 636a1f9 + 0d0b9d6 commit 8a775ae
Show file tree
Hide file tree
Showing 1,233 changed files with 43,850 additions and 8,811 deletions.
8 changes: 4 additions & 4 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ github:
- BE UT (Doris BE UT)
- Build Broker
- Build Documents
- ShellCheck
- ShellCheck
- clickbench-new (clickbench)
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
Expand Down Expand Up @@ -94,8 +94,8 @@ github:
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- Build Documents
- ShellCheck
- Build Documents
- clickbench-new (clickbench)
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
Expand All @@ -112,11 +112,11 @@ github:
- platoneko
- Lchangliang
- freemandealer
- nanfeng1999
- gitccl
- shuke987
- wm1581066
- KassieZ
- yujun777
- gavinchou

notifications:
pullrequests_status: [email protected]
Expand Down
34 changes: 32 additions & 2 deletions .github/workflows/comment-to-trigger-teamcity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ jobs:
"${COMMENT_BODY}" == *'run pipelinex_p0'* ||
"${COMMENT_BODY}" == *'run clickbench'* ||
"${COMMENT_BODY}" == *'run arm'* ||
"${COMMENT_BODY}" == *'run tpch'* ]]; then
"${COMMENT_BODY}" == *'run tpch'* ||
"${COMMENT_BODY}" == *'run performance'* ]]; then
echo "comment_trigger=true" | tee -a "$GITHUB_OUTPUT"
else
echo "comment_trigger=false" | tee -a "$GITHUB_OUTPUT"
Expand All @@ -63,7 +64,7 @@ jobs:
echo "TARGET_BRANCH='${TARGET_BRANCH}'" | tee -a "$GITHUB_OUTPUT"
echo "COMMENT_BODY='${COMMENT_BODY}'" | tee -a "$GITHUB_OUTPUT"
reg="run (buildall|compile|p0|p1|feut|beut|external|clickbench|pipelinex_p0|arm|tpch)( [1-9]*[0-9]+)*"
reg="run (buildall|compile|p0|p1|feut|beut|external|clickbench|pipelinex_p0|arm|tpch|performance)( [1-9]*[0-9]+)*"
COMMENT_TRIGGER_TYPE="$(echo -e "${COMMENT_BODY}" | xargs | grep -E "${reg}" | awk -F' ' '{print $2}' | sed -n 1p | sed 's/\r//g')"
COMMENT_REPEAT_TIMES="$(echo -e "${COMMENT_BODY}" | xargs | grep -E "${reg}" | awk -F' ' '{print $3}' | sed -n 1p | sed 's/\r//g')"
echo "COMMENT_TRIGGER_TYPE=${COMMENT_TRIGGER_TYPE}" | tee -a "$GITHUB_OUTPUT"
Expand Down Expand Up @@ -113,6 +114,11 @@ jobs:
else
echo "changed_p1=false" | tee -a "$GITHUB_OUTPUT"
fi
if file_changed_performance; then
echo "changed_performance=true" | tee -a "$GITHUB_OUTPUT"
else
echo "changed_performance=false" | tee -a "$GITHUB_OUTPUT"
fi
else
echo "INFO: failed to _get_pr_changed_files, default trigger all"
echo "changed_fe_ut=true" | tee -a "$GITHUB_OUTPUT"
Expand All @@ -124,6 +130,7 @@ jobs:
echo "changed_pipelinex_p0=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_arm=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_p1=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_performance=true" | tee -a "$GITHUB_OUTPUT"
fi
# - name: "Setup tmate session"
Expand Down Expand Up @@ -266,3 +273,26 @@ jobs:
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"tpch" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
- name: "Trigger or Skip performance"
if: ${{ fromJSON(steps.parse.outputs.comment_trigger) && contains(fromJSON('["performance", "buildall"]'), steps.parse.outputs.COMMENT_TRIGGER_TYPE) }}
run: |
source ./regression-test/pipeline/common/teamcity-utils.sh
set -x
if [[ "${{ steps.parse.outputs.TARGET_BRANCH }}" == "'master'" ]]; then
echo "PR target branch is master, need run performance"
trigger_or_skip_build \
"${{ steps.changes.outputs.changed_performance }}" \
"${{ steps.parse.outputs.PULL_REQUEST_NUM }}" \
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"performance" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
else
echo "PR target branch is not master, skip run performance"
trigger_or_skip_build \
"false" \
"${{ steps.parse.outputs.PULL_REQUEST_NUM }}" \
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"performance" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
fi
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ header:
- "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem"
- "conf/mysql_ssl_default_certificate/client_certificate/client-key.pem"
- "regression-test/ssl_default_certificate/*"
- "regression-test/pipeline/performance/conf/session_variables"
- "extension/beats/go.mod"
- "extension/beats/go.sum"
- "pytest/hdfs"
Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Doris
Copyright 2018-2023 The Apache Software Foundation
Copyright 2018-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
8 changes: 8 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "agent/topic_subscriber.h"
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
#include "agent/workload_sched_policy_listener.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -72,6 +73,13 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
LOG(INFO) << "Register workload group listener";
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
std::move(wg_listener));

std::unique_ptr<TopicListener> policy_listener =
std::make_unique<WorkloadschedPolicyListener>(exec_env);
LOG(INFO) << "Register workload scheduler policy listener";
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_SCHED_POLICY,
std::move(policy_listener));

#endif
}

Expand Down
15 changes: 13 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info, int
};

auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
CHECK(st.ok()) << name << ": " << st;
CHECK(st.ok()) << _name << ": " << st;
}

ReportWorker::~ReportWorker() {
Expand Down Expand Up @@ -731,6 +731,16 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest&
tablet_meta_info.time_series_compaction_time_threshold_seconds);
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
if (tablet->tablet_meta()->compaction_policy() != "time_series") {
status = Status::InvalidArgument(
"only time series compaction policy support time series config");
continue;
}
tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold(
tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
need_to_save = true;
}
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
Expand Down Expand Up @@ -1518,7 +1528,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
tablet->published_count.fetch_add(1);
int64_t published_count = tablet->published_count.load();
if (published_count % 10 == 0) {
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
if (!st.ok()) [[unlikely]] {
Expand Down
78 changes: 78 additions & 0 deletions be/src/agent/workload_sched_policy_listener.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "agent/workload_sched_policy_listener.h"

#include "runtime/workload_management/workload_action.h"
#include "runtime/workload_management/workload_condition.h"
#include "runtime/workload_management/workload_sched_policy.h"
#include "runtime/workload_management/workload_sched_policy_mgr.h"

namespace doris {

void WorkloadschedPolicyListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> policy_map;
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_sched_policy) {
continue;
}

TWorkloadSchedPolicy tpolicy = topic_info.workload_sched_policy;
// some metric or action can not exec in be, then need skip
bool need_skip_current_policy = false;

std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
for (TWorkloadCondition& cond : tpolicy.condition_list) {
std::unique_ptr<WorkloadCondition> cond_ptr =
WorkloadConditionFactory::create_workload_condition(&cond);
if (cond_ptr == nullptr) {
need_skip_current_policy = true;
break;
}
cond_ptr_list.push_back(std::move(cond_ptr));
}
if (need_skip_current_policy) {
continue;
}

std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
for (TWorkloadAction& action : tpolicy.action_list) {
std::unique_ptr<WorkloadAction> action_ptr =
WorkloadActionFactory::create_workload_action(&action);
if (action_ptr == nullptr) {
need_skip_current_policy = true;
break;
}
action_ptr_list.push_back(std::move(action_ptr));
}
if (need_skip_current_policy) {
continue;
}

std::shared_ptr<WorkloadSchedPolicy> policy_ptr = std::make_shared<WorkloadSchedPolicy>();
policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version, tpolicy.enabled,
tpolicy.priority, std::move(cond_ptr_list), std::move(action_ptr_list));
policy_map.emplace(tpolicy.id, std::move(policy_ptr));
}
size_t new_policy_size = policy_map.size();
if (new_policy_size > 0) {
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
}
LOG(INFO) << "[workload_schedule]finish update workload schedule policy, size="
<< new_policy_size;
}
} // namespace doris
37 changes: 37 additions & 0 deletions be/src/agent/workload_sched_policy_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class WorkloadschedPolicyListener : public TopicListener {
public:
WorkloadschedPolicyListener(ExecEnv* exec_env) : _exec_env(exec_env) {}

void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) override;

private:
ExecEnv* _exec_env = nullptr;
};

} // namespace doris
Loading

0 comments on commit 8a775ae

Please sign in to comment.