From 20607f04afb283aec35efff82bc984a6bb8f4a81 Mon Sep 17 00:00:00 2001 From: baytan0720 Date: Fri, 3 Nov 2023 11:20:22 +0800 Subject: [PATCH 01/12] [fix]tools-v2: add version in function NewQuerySubUri Signed-off-by: baytan0720 --- tools-v2/internal/utils/snapshot/snapshot.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools-v2/internal/utils/snapshot/snapshot.go b/tools-v2/internal/utils/snapshot/snapshot.go index b4fdbf7df0..f160f79e55 100644 --- a/tools-v2/internal/utils/snapshot/snapshot.go +++ b/tools-v2/internal/utils/snapshot/snapshot.go @@ -28,6 +28,8 @@ import ( ) func NewQuerySubUri(params map[string]any) string { + params[QueryVersion] = Version + values := strings.Builder{} for key, value := range params { if value != "" && value != nil { From cad7152a8d174a54e123c1fe28c0000158b3e22c Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Tue, 31 Oct 2023 19:27:20 +0800 Subject: [PATCH 02/12] Fix metaserver deadlock caused by bthread coroutine switching Signed-off-by: Hanqing Wu --- curvefs/src/metaserver/partition.cpp | 97 ++++++++++++++++------------ src/common/concurrent/rw_lock.h | 58 +++++++++++++++-- 2 files changed, 109 insertions(+), 46 deletions(-) diff --git a/curvefs/src/metaserver/partition.cpp b/curvefs/src/metaserver/partition.cpp index 3c3e7d7b99..839e21e93a 100644 --- a/curvefs/src/metaserver/partition.cpp +++ b/curvefs/src/metaserver/partition.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "curvefs/proto/metaserver.pb.h" #include "curvefs/src/metaserver/copyset/copyset_node_manager.h" @@ -537,8 +538,17 @@ MetaStatusCode Partition::GetAllBlockGroup( } void Partition::StartS3Compact() { - S3CompactManager::GetInstance().Register( - S3Compact{inodeManager_, partitionInfo_}); + // register s3 compaction task in a separate thread, since the caller may + // holds a pthread wrlock when calling this function, and create `S3Compact` + // will acquire a bthread rwlock, may cause thread switching, thus causing a + // deadlock. + // FIXME(wuhanqing): handle it in a more elegant way + auto handle = std::async(std::launch::async, [this]() { + S3CompactManager::GetInstance().Register( + S3Compact{inodeManager_, partitionInfo_}); + }); + + handle.wait(); } void Partition::CancelS3Compact() { @@ -546,45 +556,50 @@ void Partition::CancelS3Compact() { } void Partition::StartVolumeDeallocate() { - FsInfo fsInfo; - bool ok = - FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), &fsInfo); - if (!ok) { - LOG(ERROR) - << "Partition start volume deallocate fail, get fsinfo fail. fsid=" - << partitionInfo_.fsid(); - return; - } - - if (!fsInfo.detail().has_volume()) { - LOG(INFO) << "Partition not belong to volume, do not need start " - "deallocate. partitionInfo=" - << partitionInfo_.DebugString(); - return; - } - - VolumeDeallocateCalOption calOpt; - calOpt.kvStorage = kvStorage_; - calOpt.inodeStorage = inodeStorage_; - calOpt.nameGen = nameGen_; - auto copysetNode = - copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode( - partitionInfo_.poolid(), partitionInfo_.copysetid()); - if (copysetNode == nullptr) { - LOG(ERROR) << "Partition get copyset node failed. poolid=" - << partitionInfo_.poolid() - << ", copysetid=" << partitionInfo_.copysetid(); - return; - } - - InodeVolumeSpaceDeallocate task(partitionInfo_.fsid(), - partitionInfo_.partitionid(), copysetNode); - task.Init(calOpt); - - VolumeDeallocateManager::GetInstance().Register(std::move(task)); - - VLOG(3) << "Partition start volume deallocate success. partitionInfo=" - << partitionInfo_.DebugString(); + // FIXME(wuhanqing): same as `StartS3Compact` + auto handle = std::async(std::launch::async, [this]() { + FsInfo fsInfo; + bool ok = FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), + &fsInfo); + if (!ok) { + LOG(ERROR) << "Partition start volume deallocate fail, get fsinfo " + "fail. fsid=" + << partitionInfo_.fsid(); + return; + } + + if (!fsInfo.detail().has_volume()) { + LOG(INFO) << "Partition not belong to volume, do not need start " + "deallocate. partitionInfo=" + << partitionInfo_.DebugString(); + return; + } + + VolumeDeallocateCalOption calOpt; + calOpt.kvStorage = kvStorage_; + calOpt.inodeStorage = inodeStorage_; + calOpt.nameGen = nameGen_; + auto copysetNode = + copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode( + partitionInfo_.poolid(), partitionInfo_.copysetid()); + if (copysetNode == nullptr) { + LOG(ERROR) << "Partition get copyset node failed. poolid=" + << partitionInfo_.poolid() + << ", copysetid=" << partitionInfo_.copysetid(); + return; + } + + InodeVolumeSpaceDeallocate task( + partitionInfo_.fsid(), partitionInfo_.partitionid(), copysetNode); + task.Init(calOpt); + + VolumeDeallocateManager::GetInstance().Register(std::move(task)); + + VLOG(3) << "Partition start volume deallocate success. partitionInfo=" + << partitionInfo_.DebugString(); + }); + + handle.wait(); } void Partition::CancelVolumeDeallocate() { diff --git a/src/common/concurrent/rw_lock.h b/src/common/concurrent/rw_lock.h index d7c47c7d3c..807afb3b8c 100644 --- a/src/common/concurrent/rw_lock.h +++ b/src/common/concurrent/rw_lock.h @@ -23,13 +23,31 @@ #ifndef SRC_COMMON_CONCURRENT_RW_LOCK_H_ #define SRC_COMMON_CONCURRENT_RW_LOCK_H_ -#include #include -#include #include +#include +#include +#include // gettid +#include "include/curve_compiler_specific.h" #include "src/common/uncopyable.h" +// Due to the mixed use of bthread and pthread in some cases, acquiring another +// bthread lock(mutex/rwlock) after acquiring a write lock on a pthread rwlock +// may result in switching the bthread coroutine, and then the operation of +// releasing the previous write lock in the other pthread will not take effect +// (implying that the write lock is still held), thus causing a deadlock. + +// Check pthread rwlock tid between wrlock and unlock +#if defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) && \ + (ENABLE_CHECK_PTHREAD_WRLOCK_TID == 1) +#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1 +#elif !defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) +#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1 +#else +#define CURVE_CHECK_PTHREAD_WRLOCK_TID 0 +#endif + namespace curve { namespace common { @@ -51,10 +69,21 @@ class PthreadRWLockBase : public RWLockBase { void WRLock() override { int ret = pthread_rwlock_wrlock(&rwlock_); CHECK(0 == ret) << "wlock failed: " << ret << ", " << strerror(ret); +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + tid_ = gettid(); +#endif } int TryWRLock() override { - return pthread_rwlock_trywrlock(&rwlock_); + int ret = pthread_rwlock_trywrlock(&rwlock_); + if (CURVE_UNLIKELY(ret != 0)) { + return ret; + } + +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + tid_ = gettid(); +#endif + return 0; } void RDLock() override { @@ -67,6 +96,19 @@ class PthreadRWLockBase : public RWLockBase { } void Unlock() override { +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + if (tid_ != 0) { + const pid_t current = gettid(); + // If CHECK here is triggered, please look at the comments at the + // beginning of the file. + // In the meantime, the simplest solution might be to use + // `BthreadRWLock` locks everywhere. + CHECK(tid_ == current) + << ", tid has changed, previous tid: " << tid_ + << ", current tid: " << current; + tid_ = 0; + } +#endif pthread_rwlock_unlock(&rwlock_); } @@ -76,8 +118,14 @@ class PthreadRWLockBase : public RWLockBase { pthread_rwlock_t rwlock_; pthread_rwlockattr_t rwlockAttr_; + +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + pid_t tid_ = 0; +#endif }; +#undef CURVE_CHECK_PTHREAD_WRLOCK_TID + class RWLock : public PthreadRWLockBase { public: RWLock() { @@ -122,7 +170,7 @@ class BthreadRWLock : public RWLockBase { } int TryWRLock() override { - // not support yet + LOG(WARNING) << "TryWRLock not support yet"; return EINVAL; } @@ -132,7 +180,7 @@ class BthreadRWLock : public RWLockBase { } int TryRDLock() override { - // not support yet + LOG(WARNING) << "TryRDLock not support yet"; return EINVAL; } From c88fd260ee4988508e6e16b564942cf18668dc6e Mon Sep 17 00:00:00 2001 From: hzwuhongsong Date: Mon, 6 Nov 2023 10:38:35 +0800 Subject: [PATCH 03/12] curvefs/client: fix memory leak of memcached --- curvefs/src/client/kvclient/memcache_client.h | 1 + 1 file changed, 1 insertion(+) diff --git a/curvefs/src/client/kvclient/memcache_client.h b/curvefs/src/client/kvclient/memcache_client.h index 57e82a7f44..d481f5e5e4 100644 --- a/curvefs/src/client/kvclient/memcache_client.h +++ b/curvefs/src/client/kvclient/memcache_client.h @@ -169,6 +169,7 @@ class MemCachedClient : public KVClient { LOG(ERROR) << "Get key = " << key << " error = " << *errorlog << ", get_value_len = " << value_length << ", expect_value_len = " << length; + free(res); memcached_free(tcli); tcli = nullptr; } From 9ef481803a11de679e8f251093f94a4278b408c9 Mon Sep 17 00:00:00 2001 From: hzwuhongsong Date: Tue, 7 Nov 2023 11:34:23 +0800 Subject: [PATCH 04/12] curvefs/client: fix unittest failed --- curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp b/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp index 86162d3686..5a3f8e5d85 100644 --- a/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp +++ b/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp @@ -472,7 +472,7 @@ TEST_F(RaftCliService2Test, ChangePeerTest) { // change peer succeed { - sleep(20); + sleep(60); ChangePeersRequest2 request; ChangePeersResponse2 response; SetRequestPoolAndCopysetId(&request); From 53f8b94e998cbc46f1201b1618561bbe34fae4d3 Mon Sep 17 00:00:00 2001 From: Wine93 Date: Wed, 1 Nov 2023 14:15:53 +0800 Subject: [PATCH 05/12] docker: added dockerfile for build in ubuntu22. Signed-off-by: Wine93 Signed-off-by: fine97 --- .obm.cfg | 2 +- curvefs/sdk/README.md | 6 +-- docker/ubuntu22/compile/Dockerfile | 10 +++++ docker/ubuntu22/compile/Makefile | 4 ++ docker/ubuntu22/compile/setup.sh | 63 ++++++++++++++++++++++++++++++ util/playground.sh | 33 +++++++++------- 6 files changed, 99 insertions(+), 19 deletions(-) create mode 100644 docker/ubuntu22/compile/Dockerfile create mode 100644 docker/ubuntu22/compile/Makefile create mode 100644 docker/ubuntu22/compile/setup.sh diff --git a/.obm.cfg b/.obm.cfg index ada426f8e5..1803905eb0 100644 --- a/.obm.cfg +++ b/.obm.cfg @@ -1,2 +1,2 @@ container_name: curve-build-playground.master -container_image: opencurvedocker/curve-base:build-debian11 +container_image: opencurvedocker/curve-build:ubuntu22 diff --git a/curvefs/sdk/README.md b/curvefs/sdk/README.md index c58aa8feba..6848362017 100644 --- a/curvefs/sdk/README.md +++ b/curvefs/sdk/README.md @@ -6,12 +6,12 @@ How to build ``` bash $ git clone git@github.com:opencurve/curve.git -$ cd curve -$ make dep stor=fs +$ make playground +$ make ci-dep stor=fs $ make sdk ``` -It will generate a jar after build success: +It will generate a jar package after build success: ``` Build SDK success => /curve/curvefs/sdk/output/curvefs-hadoop-1.0-SNAPSHOT.jar diff --git a/docker/ubuntu22/compile/Dockerfile b/docker/ubuntu22/compile/Dockerfile new file mode 100644 index 0000000000..76f34e44be --- /dev/null +++ b/docker/ubuntu22/compile/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:22.04 + +LABEL maintainer="Wine93 " + +ENV TZ=Asia/Shanghai \ + LANG=C.UTF-8 \ + LC_ALL=C.UTF-8 + +COPY setup.sh .setup.sh +RUN bash .setup.sh diff --git a/docker/ubuntu22/compile/Makefile b/docker/ubuntu22/compile/Makefile new file mode 100644 index 0000000000..3f2bfbacfe --- /dev/null +++ b/docker/ubuntu22/compile/Makefile @@ -0,0 +1,4 @@ +.PHONY: build + +build: + docker build -t opencurvedocker/curve-build:ubuntu22 . diff --git a/docker/ubuntu22/compile/setup.sh b/docker/ubuntu22/compile/setup.sh new file mode 100644 index 0000000000..ea7fe73891 --- /dev/null +++ b/docker/ubuntu22/compile/setup.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +g_bazelisk_url="https://github.com/bazelbuild/bazelisk/releases/download/v1.18.0/bazelisk-linux-amd64" +g_protoc_url="https://github.com/protocolbuffers/protobuf/releases/download/v21.8/protoc-21.8-linux-x86_64.zip" + +cat << EOF > /etc/apt/sources.list +deb http://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse +deb-src http://mirrors.aliyun.com/ubuntu/ jammy main restricted universe multiverse +deb http://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse +deb-src http://mirrors.aliyun.com/ubuntu/ jammy-security main restricted universe multiverse +deb http://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse +deb-src http://mirrors.aliyun.com/ubuntu/ jammy-updates main restricted universe multiverse +deb http://mirrors.aliyun.com/ubuntu/ jammy-proposed main restricted universe multiverse +deb-src http://mirrors.aliyun.com/ubuntu/ jammy-proposed main restricted universe multiverse +deb http://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse +deb-src http://mirrors.aliyun.com/ubuntu/ jammy-backports main restricted universe multiverse +EOF + +apt-get clean +apt-get -y update +apt-get -y install --no-install-recommends \ + bison \ + build-essential \ + cmake \ + default-jdk \ + flex \ + git \ + golang \ + libcurl4-gnutls-dev \ + libfiu-dev \ + libfuse3-dev \ + libhashkit-dev \ + liblz4-dev \ + libsnappy-dev \ + libssl-dev \ + libz-dev \ + make \ + maven \ + musl \ + musl-dev \ + musl-tools \ + python3-pip \ + sudo \ + tree \ + unzip \ + uuid-dev \ + vim \ + wget +apt-get autoremove -y + +wget "${g_bazelisk_url}" -O /usr/bin/bazel +chmod a+x /usr/bin/bazel + +g_protoc_zip="/tmp/protoc.zip" +wget "${g_protoc_url}" -O ${g_protoc_zip} +unzip ${g_protoc_zip} "bin/protoc" -d /usr +rm -f ${g_protoc_zip} + +cat << EOF >> ~/.bashrc +export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64" +export GOPATH=${HOME}/go +export PATH=\$JAVA_HOME/bin:\$PATH +EOF diff --git a/util/playground.sh b/util/playground.sh index 8b1680e67d..c8d584c1ab 100755 --- a/util/playground.sh +++ b/util/playground.sh @@ -2,13 +2,11 @@ # Copyright (C) 2023 Jingli Chen (Wine93), NetEase Inc. -# see also: https://github.com/Burnett01/rsync-deployments/issues/21 - ############################ GLOBAL VARIABLES g_obm_cfg=".obm.cfg" g_worker_dir="/curve" g_container_name="curve-build-playground.master" -g_container_image="opencurvedocker/curve-base:build-debian11" +g_container_image="opencurvedocker/curve-build:ubuntu22" g_init_script=$(cat << EOF useradd -m -s /bin/bash -N -u $UID $USER echo "${USER} ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers @@ -17,15 +15,22 @@ chmod g+w /etc/passwd echo 'alias ls="ls --color"' >> /home/${USER}/.bashrc EOF ) -g_install_script=$(cat << EOF -apt-get update -apt-get -y install tree rsync golang jq vim python3-pip maven >/dev/null -curl -sSL https://bit.ly/install-xq | sudo bash >/dev/null 2>&1 -pip3 install cpplint >/dev/null 2>/dev/null -EOF -) ############################ BASIC FUNCTIONS +msg() { + printf '%b' "$1" >&2 +} + +success() { + msg "\33[32m[✔]\33[0m ${1}${2}" +} + +die() { + msg "\33[31m[✘]\33[0m ${1}${2}" + exit 1 +} + +############################ FUNCTIONS parse_cfg() { local args=`getopt -o v: --long version: -n "playground.sh" -- "$@"` eval set -- "${args}" @@ -51,13 +56,13 @@ create_container() { --env "UID=$(id -u)" \ --env "USER=${USER}" \ --env "TZ=Asia/Shanghai" \ - --hostname "playground" \ + --hostname "${g_container_name}" \ --name "${g_container_name}" \ --workdir ${g_worker_dir} \ "${g_container_image}" + docker exec "${g_container_name}" bash -c "${g_init_script}" - docker exec "${g_container_name}" bash -c "${g_install_script}" - success "create ${g_container_name} (${g_container_image}) success :)" + success "create ${g_container_name} (${g_container_image}) success :)\n" } enter_container() { @@ -68,9 +73,7 @@ enter_container() { "${g_container_name}" /bin/bash } - main() { - source "util/basic.sh" parse_cfg "$@" create_container enter_container From f160bb65a0baa5d7ea14c4a30b7d047646d88516 Mon Sep 17 00:00:00 2001 From: Wine93 Date: Wed, 1 Nov 2023 14:16:39 +0800 Subject: [PATCH 06/12] build: fixed missing some compile params for building curvefs sdk. Signed-off-by: Wine93 Signed-off-by: fine97 --- curvefs/sdk/java/native/BUILD | 2 ++ curvefs/sdk/java/pom.xml | 3 +++ 2 files changed, 5 insertions(+) diff --git a/curvefs/sdk/java/native/BUILD b/curvefs/sdk/java/native/BUILD index 3361aeea44..13dcef4778 100644 --- a/curvefs/sdk/java/native/BUILD +++ b/curvefs/sdk/java/native/BUILD @@ -26,6 +26,8 @@ cc_binary( copts = CURVE_DEFAULT_COPTS, linkopts = [ "-Wl,-rpath=/tmp/libcurvefs,--disable-new-dtags", + "-L/usr/lib/x86_64-linux-gnu/", + "-lhashkit", ], deps = [ "@com_google_absl//absl/cleanup", diff --git a/curvefs/sdk/java/pom.xml b/curvefs/sdk/java/pom.xml index b3f15f585c..ecf8be0043 100644 --- a/curvefs/sdk/java/pom.xml +++ b/curvefs/sdk/java/pom.xml @@ -30,6 +30,9 @@ native/build + + src/main/resources + From b2200b0dfde61ddb72f0c1243c337c37def1178d Mon Sep 17 00:00:00 2001 From: fine97 Date: Wed, 8 Nov 2023 15:00:45 +0800 Subject: [PATCH 07/12] curvefs/sdk: add maven build plugin,optimize the logic of parsing input parameters in flink Signed-off-by: fine97 --- curvefs/sdk/java/pom.xml | 15 +++++++ .../fs/flink/CurveFileSystemFactory.java | 39 +++++++++++-------- .../fs/flink/CurveFileSystemTableFactory.java | 16 ++++++++ .../curve/fs/hadoop/CurveFSInputStream.java | 7 ++-- .../curve/fs/hadoop/CurveFSTalker.java | 9 +++-- .../curve/fs/hadoop/CurveFileSystem.java | 29 +++++++------- .../io/opencurve/curve/fs/hadoop/Main.java | 7 ---- curvefs/sdk/libcurvefs/libcurvefs.cpp | 4 +- 8 files changed, 80 insertions(+), 46 deletions(-) delete mode 100644 curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java diff --git a/curvefs/sdk/java/pom.xml b/curvefs/sdk/java/pom.xml index ecf8be0043..a3ed98aea9 100644 --- a/curvefs/sdk/java/pom.xml +++ b/curvefs/sdk/java/pom.xml @@ -34,6 +34,21 @@ src/main/resources + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java index ac42dfaf82..2a79fcc1c4 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.opencurve.curve.fs.flink; import io.opencurve.curve.fs.hadoop.CurveFileSystem; @@ -10,28 +26,17 @@ import java.net.URI; public class CurveFileSystemFactory implements FileSystemFactory { - private org.apache.hadoop.conf.Configuration conf; - + private org.apache.hadoop.conf.Configuration conf = new Configuration(); private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs."; private static final String FLINK_CONFIG_PREFIXES = "fs."; public static String SCHEME = "curvefs"; @Override public void configure(org.apache.flink.configuration.Configuration config) { - conf = new Configuration(); - if (config != null) { - for (String key : config.keySet()) { - if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) { - String value = config.getString(key, null); - if (value != null) { - if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) { - SCHEME = key.split("\\.")[1]; - } - conf.set(key, value); - } - } - } - } + config.keySet() + .stream() + .filter(key -> key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) + .forEach(key -> conf.set(key, config.getString(key, ""))); } @Override @@ -45,4 +50,4 @@ public FileSystem create(URI uri) throws IOException { fs.initialize(uri, conf); return new HadoopFileSystem(fs); } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java index da68151bbd..d065492109 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/flink/CurveFileSystemTableFactory.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.opencurve.curve.fs.flink; import org.apache.flink.connector.file.table.FileSystemTableFactory; diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java index 2dd1be8d1d..25ad56564f 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSInputStream.java @@ -59,8 +59,6 @@ public class CurveFSInputStream extends FSInputStream { */ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs, int fh, long flength, int bufferSize) { - // Whoever's calling the constructor is responsible for doing the actual curve_open - // call and providing the file handle. fileLength = flength; fileHandle = fh; closed = false; @@ -73,6 +71,7 @@ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs, /** Curve likes things to be closed before it shuts down, * so closing the IOStream stuff voluntarily in a finalizer is good */ + @Override protected void finalize() throws Throwable { try { if (!closed) { @@ -91,7 +90,6 @@ private synchronized boolean fillBuffer() throws IOException { bufValid = 0; - // attempt to reset to old position. If it fails, too bad. curve.lseek(fileHandle, curvePos, CurveFSMount.SEEK_SET); throw new IOException("Failed to fill read buffer! Error code:" + err); } @@ -102,6 +100,7 @@ private synchronized boolean fillBuffer() throws IOException { /* * Get the current position of the stream. */ + @Override public synchronized long getPos() throws IOException { return curvePos - bufValid + bufPos; } @@ -117,6 +116,7 @@ public synchronized int available() throws IOException { return (int) (fileLength - getPos()); } + @Override public synchronized void seek(long targetPos) throws IOException { LOG.trace("CurveInputStream.seek: Seeking to position " + targetPos + " on fd " + fileHandle); @@ -142,6 +142,7 @@ public synchronized void seek(long targetPos) throws IOException { * they'll be dealt with before anybody even tries to call this method! * @return false. */ + @Override public synchronized boolean seekToNewSource(long targetPos) { return false; } diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java index 90366cc210..26d0492142 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFSTalker.java @@ -29,6 +29,7 @@ import io.opencurve.curve.fs.libfs.CurveFSStat; import io.opencurve.curve.fs.libfs.CurveFSStatVFS; +import java.util.UUID; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; @@ -37,6 +38,7 @@ class CurveFSTalker extends CurveFSProto { private CurveFSMount mount; private String fsname = null; + private String mountpoint = null; private boolean inited = false; private static final String PREFIX_KEY = "curvefs"; @@ -72,14 +74,15 @@ void initialize(URI uri, Configuration conf) throws IOException { if (null == fsname || fsname.isEmpty()) { throw new IOException("curvefs.name is not set"); } - mount.mount(fsname, "/"); + mountpoint = UUID.randomUUID().toString(); + mount.mount(fsname, mountpoint); inited = true; } @Override void shutdown() throws IOException { if (inited) { - mount.umount(fsname, "/"); + mount.umount(fsname, mountpoint); mount = null; inited = false; } @@ -179,4 +182,4 @@ void chown(Path path, int uid, int gid) throws IOException { void rename(Path src, Path dst) throws IOException { mount.rename(tostr(src), tostr(dst)); } -} +} \ No newline at end of file diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java index 09df042c1b..fc031d38d8 100644 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java +++ b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/CurveFileSystem.java @@ -59,12 +59,14 @@ private Path makeAbsolute(Path path) { return new Path(workingDir, path); } + @Override public URI getUri() { return uri; } + @Override public String getScheme() { - return uri.getScheme(); + return "hdfs"; } @Override @@ -85,14 +87,12 @@ public void initialize(URI uri, Configuration conf) throws IOException { this.workingDir = getHomeDirectory(); } - + @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { path = makeAbsolute(path); - // throws filenotfoundexception if path is a directory int fd = curve.open(path, CurveFSMount.O_RDONLY, 0); - /* get file size */ CurveFSStat stat = new CurveFSStat(); curve.fstat(fd, stat); @@ -102,10 +102,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { @Override public void close() throws IOException { - super.close(); // this method does stuff, make sure it's run! + super.close(); curve.shutdown(); } + @Override public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException { path = makeAbsolute(path); @@ -122,6 +123,7 @@ public FSDataOutputStream append(Path path, int bufferSize, Progressable progres return new FSDataOutputStream(ostream, statistics); } + @Override public Path getWorkingDirectory() { return workingDir; } @@ -144,6 +146,7 @@ public boolean mkdirs(Path f) throws IOException { return mkdirs(f, perms); } + @Override public FileStatus getFileStatus(Path path) throws IOException { path = makeAbsolute(path); @@ -160,7 +163,7 @@ public FileStatus getFileStatus(Path path) throws IOException { return status; } - + @Override public FileStatus[] listStatus(Path path) throws IOException { path = makeAbsolute(path); @@ -174,12 +177,10 @@ public FileStatus[] listStatus(Path path) throws IOException { for (int i = 0; i < status.length; i++) { status[i] = getFileStatus(new Path(path, dirlist[i])); } - curve.shutdown(); return status; } else { throw new FileNotFoundException("File " + path + " does not exist."); } - } @Override @@ -208,9 +209,9 @@ public void setTimes(Path path, long mtime, long atime) throws IOException { curve.setattr(path, stat, mask); } + @Override public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - path = makeAbsolute(path); boolean exists = exists(path); @@ -268,6 +269,7 @@ public void setOwner(Path path, String username, String groupname) throws IOExce } @Deprecated + @Override public FSDataOutputStream createNonRecursive(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, @@ -278,7 +280,7 @@ public FSDataOutputStream createNonRecursive(Path path, FsPermission permission, if (parent != null) { CurveFSStat stat = new CurveFSStat(); - curve.lstat(parent, stat); // handles FileNotFoundException case + curve.lstat(parent, stat); if (stat.isFile()) { throw new FileAlreadyExistsException(parent.toString()); } @@ -314,14 +316,15 @@ public boolean rename(Path src, Path dst) throws IOException { } @Deprecated + @Override public boolean delete(Path path) throws IOException { return delete(path, false); } + @Override public boolean delete(Path path, boolean recursive) throws IOException { path = makeAbsolute(path); - /* path exists? */ FileStatus status; try { status = getFileStatus(path); @@ -329,13 +332,11 @@ public boolean delete(Path path, boolean recursive) throws IOException { return false; } - /* we're done if its a file */ if (status.isFile()) { curve.unlink(path); return true; } - /* get directory contents */ FileStatus[] dirlist = listStatus(path); if (dirlist == null) { return false; @@ -383,6 +384,6 @@ protected int getDefaultPort() { @Override public String getCanonicalServiceName() { - return null; // Does not support Token + return null; } } diff --git a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java b/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java deleted file mode 100644 index d488e309dc..0000000000 --- a/curvefs/sdk/java/src/main/java/io/opencurve/curve/fs/hadoop/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.opencurve.curve.fs.hadoop; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} diff --git a/curvefs/sdk/libcurvefs/libcurvefs.cpp b/curvefs/sdk/libcurvefs/libcurvefs.cpp index d53c0b51b8..fb54c3b509 100644 --- a/curvefs/sdk/libcurvefs/libcurvefs.cpp +++ b/curvefs/sdk/libcurvefs/libcurvefs.cpp @@ -143,7 +143,7 @@ int curvefs_open(uintptr_t instance_ptr, } } - uint64_t fd; + uint64_t fd = 0; rc = mount->vfs->Open(path, flags, mode, &fd); if (rc != CURVEFS_ERROR::OK) { return SysErr(rc); @@ -164,7 +164,7 @@ ssize_t curvefs_read(uintptr_t instance_ptr, int fd, char* buffer, size_t count) { - size_t nread; + size_t nread = 0; auto mount = get_instance(instance_ptr); auto rc = mount->vfs->Read(fd, buffer, count, &nread); if (rc != CURVEFS_ERROR::OK) { From 68457c453a1e07da56c74bb33b5da69ebe7fedb2 Mon Sep 17 00:00:00 2001 From: Cyber-SiKu Date: Wed, 8 Nov 2023 17:55:47 +0800 Subject: [PATCH 08/12] [fix]curvefs/client: warmup process Fixed a bug where warm-up progress may not be added Signed-off-by: Cyber-SiKu --- curvefs/src/client/warmup/warmup_manager.cpp | 6 ++++-- curvefs/src/client/warmup/warmup_manager.h | 5 ++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/curvefs/src/client/warmup/warmup_manager.cpp b/curvefs/src/client/warmup/warmup_manager.cpp index 1f7dde85cb..bcc9fb9d73 100644 --- a/curvefs/src/client/warmup/warmup_manager.cpp +++ b/curvefs/src/client/warmup/warmup_manager.cpp @@ -70,7 +70,8 @@ bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key, return false; } // add warmup Progress - if (AddWarmupProcess(key, path, type)) { + WriteLockGuard lock(inode2ProgressMutex_); + if (AddWarmupProcessLocked(key, path, type)) { LOG(INFO) << "add warmup list task:" << key; WriteLockGuard lock(warmupFilelistDequeMutex_); auto iter = FindWarmupFilelistByKeyLocked(key); @@ -96,7 +97,8 @@ bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, const std::string& path, return false; } // add warmup Progress - if (AddWarmupProcess(key, path, type)) { + WriteLockGuard lock(inode2ProgressMutex_); + if (AddWarmupProcessLocked(key, path, type)) { LOG(INFO) << "add warmup single task:" << key; FetchDentryEnqueue(key, path); } diff --git a/curvefs/src/client/warmup/warmup_manager.h b/curvefs/src/client/warmup/warmup_manager.h index 5cb7342fd0..f55752c801 100644 --- a/curvefs/src/client/warmup/warmup_manager.h +++ b/curvefs/src/client/warmup/warmup_manager.h @@ -285,9 +285,8 @@ class WarmupManager { * @return true * @return false warmupProcess has been added */ - virtual bool AddWarmupProcess(fuse_ino_t key, const std::string& path, - WarmupStorageType type) { - WriteLockGuard lock(inode2ProgressMutex_); + virtual bool AddWarmupProcessLocked(fuse_ino_t key, const std::string& path, + WarmupStorageType type) { auto retPg = inode2Progress_.emplace(key, WarmupProgress(type, path)); return retPg.second; } From 015cd25c055620cb5acd71b75317376baa274f32 Mon Sep 17 00:00:00 2001 From: h0hmj Date: Fri, 10 Nov 2023 11:02:23 +0800 Subject: [PATCH 09/12] tools-v2: fix update fs bug Signed-off-by: h0hmj --- tools-v2/internal/error/error.go | 13 +++++++++++++ tools-v2/pkg/cli/command/curvefs/update/fs/fs.go | 6 +++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/tools-v2/internal/error/error.go b/tools-v2/internal/error/error.go index 7a1b71d870..0e4e44f9e0 100644 --- a/tools-v2/internal/error/error.go +++ b/tools-v2/internal/error/error.go @@ -573,6 +573,19 @@ var ( } return NewRpcReultCmdError(statusCode, message) } + ErrUpdateFs = func(statusCode int) *CmdError { + var message string + code := mds.FSStatusCode(statusCode) + switch code { + case mds.FSStatusCode_OK: + message = "success" + case mds.FSStatusCode_NOT_FOUND: + message = "fs not found!" + default: + message = fmt.Sprintf("update fs failed!, error is %s", mds.FSStatusCode_name[int32(code)]) + } + return NewRpcReultCmdError(statusCode, message) + } ErrGetCopysetsInfo = func(statusCode int) *CmdError { code := topology.TopoStatusCode(statusCode) message := fmt.Sprintf("get copysets info failed: status code is %s", code.String()) diff --git a/tools-v2/pkg/cli/command/curvefs/update/fs/fs.go b/tools-v2/pkg/cli/command/curvefs/update/fs/fs.go index f4b5ad2c2d..54628fc1d6 100644 --- a/tools-v2/pkg/cli/command/curvefs/update/fs/fs.go +++ b/tools-v2/pkg/cli/command/curvefs/update/fs/fs.go @@ -83,7 +83,7 @@ func (fCmd *FsCommand) AddFlags() { func (fCmd *FsCommand) Init(cmd *cobra.Command, args []string) error { // args check - fsName, _ := cmd.Flags().GetString("fsName") + fsName, _ := cmd.Flags().GetString(config.CURVEFS_FSNAME) request := &mds.UpdateFsInfoRequest{ FsName: &fsName, } @@ -131,10 +131,10 @@ func (fCmd *FsCommand) RunCommand(cmd *cobra.Command, args []string) error { } response := result.(*mds.UpdateFsInfoResponse) - errCreate := cmderror.ErrCreateFs(int(response.GetStatusCode())) + errUpdateFs := cmderror.ErrUpdateFs(int(response.GetStatusCode())) row := map[string]string{ cobrautil.ROW_FS_NAME: fCmd.Rpc.Request.GetFsName(), - cobrautil.ROW_RESULT: errCreate.Message, + cobrautil.ROW_RESULT: errUpdateFs.Message, } fCmd.TableNew.Append(cobrautil.Map2List(row, fCmd.Header)) From 297c6aa9ee15a57cd0f8ec2e028826360f791b12 Mon Sep 17 00:00:00 2001 From: Cyber-SiKu Date: Fri, 10 Nov 2023 10:48:03 +0800 Subject: [PATCH 10/12] [fix]curvefs/client: diskcache may deadlock Signed-off-by: Cyber-SiKu --- curvefs/src/client/s3/disk_cache_manager.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/curvefs/src/client/s3/disk_cache_manager.cpp b/curvefs/src/client/s3/disk_cache_manager.cpp index 1bc2ac2473..235989ca87 100644 --- a/curvefs/src/client/s3/disk_cache_manager.cpp +++ b/curvefs/src/client/s3/disk_cache_manager.cpp @@ -427,6 +427,10 @@ void DiskCacheManager::TrimCache() { InitQosParam(); if (!IsDiskCacheSafe(kRatioLevel)) { while (!IsDiskCacheSafe(FLAGS_diskTrimRatio)) { + if (!isRunning_) { + LOG(INFO) << "trim thread end."; + return; + } UpdateDiskFsUsedRatio(); if (!cachedObjName_->GetBack(&cacheKey)) { VLOG_EVERY_N(9, 1000) << "obj is empty"; From 6098f5595450a6237dd47efeeb9ebcd87e7b0036 Mon Sep 17 00:00:00 2001 From: NaturalSelect <2145973003@qq.com> Date: Thu, 9 Nov 2023 10:02:15 +0800 Subject: [PATCH 11/12] fix(util): stop write log to stderr, when FLAGS_logtostderr is false close: #2811 Signed-off-by: NaturalSelect <2145973003@qq.com> --- curvefs/src/client/curve_fuse_op.cpp | 2 ++ curvefs/src/mds/main.cpp | 2 ++ curvefs/src/metaserver/main.cpp | 2 ++ nebd/src/part2/main.cpp | 2 ++ src/chunkserver/chunkserver.cpp | 2 ++ src/common/log_util.h | 31 ++++++++++++++++++++++++++++ src/mds/main/main.cpp | 2 ++ src/snapshotcloneserver/main.cpp | 2 ++ 8 files changed, 45 insertions(+) create mode 100644 src/common/log_util.h diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp index dd249c8892..cd2128d805 100644 --- a/curvefs/src/client/curve_fuse_op.cpp +++ b/curvefs/src/client/curve_fuse_op.cpp @@ -48,6 +48,7 @@ #include "curvefs/src/common/metric_utils.h" #include "src/common/configuration.h" #include "src/common/gflags_helper.h" +#include "src/common/log_util.h" using ::curve::common::Configuration; using ::curvefs::client::CURVEFS_ERROR; @@ -152,6 +153,7 @@ int InitLog(const char *confPath, const char *argv0) { FLAGS_vlog_level = FLAGS_v; // initialize logging module + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv0); bool succ = InitAccessLog(FLAGS_log_dir); diff --git a/curvefs/src/mds/main.cpp b/curvefs/src/mds/main.cpp index 725c29e89c..f506b4e5c8 100644 --- a/curvefs/src/mds/main.cpp +++ b/curvefs/src/mds/main.cpp @@ -24,6 +24,7 @@ #include #include "curvefs/src/mds/mds.h" +#include "src/common/log_util.h" #include "src/common/configuration.h" #include "curvefs/src/common/dynamic_vlog.h" @@ -64,6 +65,7 @@ int main(int argc, char **argv) { } // initialize logging module + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv[0]); conf->PrintConfig(); diff --git a/curvefs/src/metaserver/main.cpp b/curvefs/src/metaserver/main.cpp index ec390ab951..5552dd5bed 100644 --- a/curvefs/src/metaserver/main.cpp +++ b/curvefs/src/metaserver/main.cpp @@ -29,6 +29,7 @@ #include "src/common/configuration.h" #include "curvefs/src/common/dynamic_vlog.h" #include "curvefs/src/common/threading.h" +#include "src/common/log_util.h" DEFINE_string(confPath, "curvefs/conf/metaserver.conf", "metaserver confPath"); DEFINE_string(ip, "127.0.0.1", "metasetver listen ip"); @@ -126,6 +127,7 @@ int main(int argc, char **argv) { FLAGS_vlog_level = FLAGS_v; // initialize logging module + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv[0]); conf->PrintConfig(); diff --git a/nebd/src/part2/main.cpp b/nebd/src/part2/main.cpp index f8c742fe9a..e72bb27cbf 100644 --- a/nebd/src/part2/main.cpp +++ b/nebd/src/part2/main.cpp @@ -24,12 +24,14 @@ #include #include #include "nebd/src/part2/nebd_server.h" +#include "src/common/log_util.h" DEFINE_string(confPath, "/etc/nebd/nebd-server.conf", "nebd server conf path"); int main(int argc, char* argv[]) { // 解析参数 google::ParseCommandLineFlags(&argc, &argv, false); + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv[0]); std::string confPath = FLAGS_confPath.c_str(); diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 784628447f..398e938240 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -45,6 +45,7 @@ #include "src/common/concurrent/task_thread_pool.h" #include "src/common/curve_version.h" #include "src/common/uri_parser.h" +#include "src/common/log_util.h" using ::curve::fs::LocalFileSystem; using ::curve::fs::LocalFileSystemOption; @@ -105,6 +106,7 @@ int ChunkServer::Run(int argc, char** argv) { LoadConfigFromCmdline(&conf); // 初始化日志模块 + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv[0]); // 打印参数 diff --git a/src/common/log_util.h b/src/common/log_util.h new file mode 100644 index 0000000000..458bc01a87 --- /dev/null +++ b/src/common/log_util.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_COMMON_LOG_UTIL_H_ +#define SRC_COMMON_LOG_UTIL_H_ + +#include + +namespace curve { +namespace common { + inline void DisableLoggingToStdErr() { + // NOTE: https://github.com/google/glog#setting-flags + FLAGS_stderrthreshold = 3; + } +} // namespace common +} // namespace curve + +#endif // SRC_COMMON_LOG_UTIL_H_ diff --git a/src/mds/main/main.cpp b/src/mds/main/main.cpp index c0824f3bca..9897226322 100644 --- a/src/mds/main/main.cpp +++ b/src/mds/main/main.cpp @@ -24,6 +24,7 @@ #include "src/mds/server/mds.h" #include "src/mds/common/mds_define.h" +#include "src/common/log_util.h" DEFINE_string(confPath, "conf/mds.conf", "mds confPath"); DEFINE_string(mdsAddr, "127.0.0.1:6666", "mds listen addr"); @@ -107,6 +108,7 @@ int main(int argc, char **argv) { } // initialize logging module + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv[0]); // reset SIGPIPE handler diff --git a/src/snapshotcloneserver/main.cpp b/src/snapshotcloneserver/main.cpp index b44468b857..3430ff0118 100644 --- a/src/snapshotcloneserver/main.cpp +++ b/src/snapshotcloneserver/main.cpp @@ -22,6 +22,7 @@ #include #include #include "src/snapshotcloneserver/snapshotclone_server.h" +#include "src/common/log_util.h" DEFINE_string(conf, "conf/snapshot_clone_server.conf", "snapshot&clone server config file path"); //NOLINT DEFINE_string(addr, "127.0.0.1:5555", "snapshotcloneserver address"); @@ -80,6 +81,7 @@ int main(int argc, char **argv) { LoadConfigFromCmdline(conf.get()); conf->PrintConfig(); conf->ExposeMetric("snapshot_clone_server_config"); + curve::common::DisableLoggingToStdErr(); google::InitGoogleLogging(argv[0]); snapshotcloneserver_main(conf); } From f79eb254ca7362d2185a6f9223d9468b960eb2de Mon Sep 17 00:00:00 2001 From: hzwuhongsong Date: Mon, 20 Nov 2023 14:28:53 +0800 Subject: [PATCH 12/12] curvefs/client: fix trash bugs --- .../src/metaserver/copyset/copyset_node.cpp | 5 +- curvefs/src/metaserver/inode_manager.cpp | 18 ++++--- curvefs/src/metaserver/inode_storage.cpp | 42 ++++++++++++++- curvefs/src/metaserver/inode_storage.h | 9 ++++ curvefs/src/metaserver/metastore.cpp | 34 +++++++++++- curvefs/src/metaserver/metastore.h | 4 ++ .../metaserver/storage/rocksdb_storage.cpp | 52 ++++++++++++++++++- .../src/metaserver/storage/rocksdb_storage.h | 19 +++++++ curvefs/src/metaserver/storage/storage.h | 12 +++++ curvefs/src/metaserver/trash.cpp | 6 ++- curvefs/src/metaserver/trash_manager.cpp | 17 ++++++ curvefs/src/metaserver/trash_manager.h | 22 ++++++++ 12 files changed, 225 insertions(+), 15 deletions(-) diff --git a/curvefs/src/metaserver/copyset/copyset_node.cpp b/curvefs/src/metaserver/copyset/copyset_node.cpp index 449886204f..bb755fcc9c 100644 --- a/curvefs/src/metaserver/copyset/copyset_node.cpp +++ b/curvefs/src/metaserver/copyset/copyset_node.cpp @@ -161,6 +161,7 @@ bool CopysetNode::Init(const CopysetNodeOptions& options) { } bool CopysetNode::Start() { + VLOG(3) << "copyset is starting, copyset: " << name_; if (!raftNode_) { LOG(ERROR) << "RaftNode didn't created, copyset: " << name_; return false; @@ -170,8 +171,8 @@ bool CopysetNode::Start() { LOG(ERROR) << "Fail to init raft node, copyset: " << name_; return false; } - - LOG(INFO) << "Run copyset success, copyset: " << name_; + metaStore_->LoadAll(); + VLOG(3) << "copyset start success, copyset: " << name_; return true; } diff --git a/curvefs/src/metaserver/inode_manager.cpp b/curvefs/src/metaserver/inode_manager.cpp index 91fb83a784..ec334bc509 100644 --- a/curvefs/src/metaserver/inode_manager.cpp +++ b/curvefs/src/metaserver/inode_manager.cpp @@ -312,7 +312,7 @@ MetaStatusCode InodeManager::DeleteInode(uint32_t fsId, uint64_t inodeId, MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request, int64_t logIndex) { CHECK_APPLIED(); - VLOG(9) << "update inode, fsid: " << request.fsid() + VLOG(0) << "whs update inode, fsid: " << request.fsid() << ", inodeid: " << request.inodeid(); NameLockGuard lg(inodeLock_, GetInodeLockName(request.fsid(), request.inodeid())); @@ -388,11 +388,6 @@ MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request, } } - if (s3NeedTrash) { - trash_->Add(old.fsid(), old.inodeid(), old.dtime()); - --(*type2InodeNum_)[old.type()]; - } - const S3ChunkInfoMap &map2add = request.s3chunkinfoadd(); const S3ChunkInfoList *list2add; VLOG(9) << "UpdateInode inode " << old.inodeid() << " map2add size " @@ -443,7 +438,16 @@ MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request, return MetaStatusCode::STORAGE_INTERNAL_ERROR; } } - VLOG(9) << "UpdateInode success, " << request.ShortDebugString(); + + if (s3NeedTrash) { +VLOG(0) << "whs add need trash, " << request.ShortDebugString(); + + inodeStorage_->UpdateDeletingKey(old, logIndex); + trash_->Add(old.fsid(), old.inodeid(), old.dtime()); + --(*type2InodeNum_)[old.type()]; + } + + VLOG(0) << "whs UpdateInode success, " << request.ShortDebugString(); return MetaStatusCode::OK; } diff --git a/curvefs/src/metaserver/inode_storage.cpp b/curvefs/src/metaserver/inode_storage.cpp index 8e5da8a913..2e30a23c15 100644 --- a/curvefs/src/metaserver/inode_storage.cpp +++ b/curvefs/src/metaserver/inode_storage.cpp @@ -185,6 +185,46 @@ MetaStatusCode InodeStorage::Insert(const Inode& inode, int64_t logIndex) { return MetaStatusCode::STORAGE_INTERNAL_ERROR; } +MetaStatusCode InodeStorage::UpdateDeletingKey(const Inode& inode, int64_t logIndex) { + WriteLockGuard lg(rwLock_); + Key4Inode key(inode.fsid(), inode.inodeid()); + std::string skey = conv_.SerializeToString(key); + VLOG(9) << "update deleting key, " << inode.inodeid(); + const char* step = "Begin transaction"; + std::shared_ptr txn; + txn = kvStorage_->BeginTransaction(); + if (txn == nullptr) { + LOG(ERROR) << "Begin transaction failed"; + return MetaStatusCode::STORAGE_INTERNAL_ERROR; + } + auto rc = txn->HSetDeleting(table4Inode_, skey , inode); + step = "insert inode "; + if (rc.ok()) { + // delete key + // rc = DeleteInternal(txn.get(), key); + rc = txn->HDel(table4Inode_, skey); + step = "delete inode "; + } + if (rc.ok()) { + rc = SetAppliedIndex(txn.get(), logIndex); + step = "Insert applied index to transaction"; + } + if (rc.ok()) { + rc = txn->Commit(); + step = "commit"; + } + if (rc.ok()) { + VLOG(0) << "update deleting key ok, " << inode.inodeid(); + return MetaStatusCode::OK; + } + LOG(ERROR) << step << "failed, status = " << rc.ToString(); + if (txn != nullptr && !txn->Rollback().ok()) { + LOG(ERROR) << "Rollback delete inode transaction failed, status = " + << rc.ToString(); + } + return MetaStatusCode::STORAGE_INTERNAL_ERROR; +} + MetaStatusCode InodeStorage::Get(const Key4Inode& key, Inode* inode) { ReadLockGuard lg(rwLock_); std::string skey = conv_.SerializeToString(key); @@ -471,7 +511,6 @@ MetaStatusCode InodeStorage::Clear() { // because if we fail stop, we will replay // raft logs and clear it again WriteLockGuard lg(rwLock_); - Status s = kvStorage_->HClear(table4Inode_); if (!s.ok()) { LOG(ERROR) << "InodeStorage clear inode table failed, status = " @@ -492,7 +531,6 @@ MetaStatusCode InodeStorage::Clear() { << s.ToString(); return MetaStatusCode::STORAGE_INTERNAL_ERROR; } - s = kvStorage_->HClear(table4InodeAuxInfo_); if (!s.ok()) { LOG(ERROR) diff --git a/curvefs/src/metaserver/inode_storage.h b/curvefs/src/metaserver/inode_storage.h index 38ad3c5f56..267fba88c4 100644 --- a/curvefs/src/metaserver/inode_storage.h +++ b/curvefs/src/metaserver/inode_storage.h @@ -43,6 +43,7 @@ #include "curvefs/src/metaserver/storage/utils.h" #include "src/common/concurrent/rw_lock.h" +#define DELETING_PREFIX "deleting_" namespace curvefs { namespace metaserver { @@ -76,6 +77,14 @@ class InodeStorage { */ MetaStatusCode Insert(const Inode& inode, int64_t logIndex); + /** + * @brief update deleting inode key in storage + * @param[in] inode: the inode want to update + * @param[in] logIndex: the index of raft log + * @return + */ + MetaStatusCode UpdateDeletingKey(const Inode& inode, int64_t logIndex); + /** * @brief get inode from storage * @param[in] key: the key of inode want to get diff --git a/curvefs/src/metaserver/metastore.cpp b/curvefs/src/metaserver/metastore.cpp index 875f71cffc..2fd18e7acd 100644 --- a/curvefs/src/metaserver/metastore.cpp +++ b/curvefs/src/metaserver/metastore.cpp @@ -60,6 +60,7 @@ using KVStorage = ::curvefs::metaserver::storage::KVStorage; using Key4S3ChunkInfoList = ::curvefs::metaserver::storage::Key4S3ChunkInfoList; using ::curvefs::metaserver::storage::MemoryStorage; +using ::curvefs::metaserver::storage::NameGenerator; using ::curvefs::metaserver::storage::RocksDBStorage; using ::curvefs::metaserver::storage::StorageOptions; @@ -87,6 +88,8 @@ MetaStoreImpl::MetaStoreImpl(copyset::CopysetNode *node, storageOptions_(storageOptions) {} bool MetaStoreImpl::Load(const std::string &pathname) { +LOG(ERROR) << "whs load start"; + // Load from raft snap file to memory WriteLockGuard writeLockGuard(rwLock_); MetaStoreFStream fstream(&partitionMap_, kvStorage_, @@ -147,6 +150,9 @@ bool MetaStoreImpl::Load(const std::string &pathname) { } startCompacts(); + + +LOG(ERROR) << "whs load end"; return true; } @@ -859,7 +865,33 @@ bool MetaStoreImpl::InitStorage() { return false; } - return kvStorage_->Open(); + + if (!kvStorage_->Open()) { + return false; + } + + return true; +} + +void MetaStoreImpl::BuildTrashList() { + + std::shared_ptr nameGen = std::make_shared(0); + + std::shared_ptr inodeStorage = + std::make_shared(kvStorage_, nameGen, 1); + + auto trash = std::make_shared(inodeStorage); + + TrashManager::GetInstance().BuildAbortTrash(kvStorage_, trash); +} + +void MetaStoreImpl::LoadAll() { + LOG(ERROR) << "InitStorage start"; + // kvStorage_->LoadAll(); + LOG(ERROR) << "InitStorage start01"; + + BuildTrashList(); + LOG(ERROR) << "InitStorage start02"; } } // namespace metaserver diff --git a/curvefs/src/metaserver/metastore.h b/curvefs/src/metaserver/metastore.h index a13c0a4980..13ac6e111d 100644 --- a/curvefs/src/metaserver/metastore.h +++ b/curvefs/src/metaserver/metastore.h @@ -117,6 +117,7 @@ class MetaStore { virtual bool SaveData(const std::string& dir, std::vector* files) = 0; virtual bool Clear() = 0; + virtual void LoadAll() {}; virtual bool Destroy() = 0; virtual MetaStatusCode CreatePartition( const CreatePartitionRequest* request, @@ -223,6 +224,7 @@ class MetaStoreImpl : public MetaStore { std::vector* files) override; bool Clear() override; bool Destroy() override; + void LoadAll() override; MetaStatusCode CreatePartition(const CreatePartitionRequest* request, CreatePartitionResponse* response, @@ -351,6 +353,8 @@ class MetaStoreImpl : public MetaStore { // REQUIRES: rwLock_ is held with write permission bool ClearInternal(); + void BuildTrashList(); + private: RWLock rwLock_; // protect partitionMap_ std::shared_ptr kvStorage_; diff --git a/curvefs/src/metaserver/storage/rocksdb_storage.cpp b/curvefs/src/metaserver/storage/rocksdb_storage.cpp index 5875ba6817..5906980121 100644 --- a/curvefs/src/metaserver/storage/rocksdb_storage.cpp +++ b/curvefs/src/metaserver/storage/rocksdb_storage.cpp @@ -26,6 +26,7 @@ #include #include +#include "src/common/string_util.h" #include "src/common/timeutility.h" #include "curvefs/src/metaserver/storage/utils.h" #include "curvefs/src/metaserver/storage/storage.h" @@ -187,7 +188,7 @@ std::string RocksDBStorage::ToInternalKey(const std::string& name, std::ostringstream oss; oss << iname << kDelimiter_ << key; std::string ikey = oss.str(); - VLOG(9) << "ikey = " << ikey << " (ordered = " << ordered + VLOG(0) << "whs ikey = " << ikey << " (ordered = " << ordered << ", name = " << name << ", key = " << key << ")" << ", size = " << ikey.size(); return ikey; @@ -241,6 +242,32 @@ Status RocksDBStorage::Set(const std::string& name, return ToStorageStatus(s); } +Status RocksDBStorage::SetDeleting(const std::string& name, + const std::string& key, + const ValueType& value, + bool ordered) { + std::string svalue; + if (!inited_) { + return Status::DBClosed(); + } else if (!value.SerializeToString(&svalue)) { + return Status::SerializedFailed(); + } + + auto handle = GetColumnFamilyHandle(ordered); + + + std::string ikey = ToInternalKey(name, key, ordered); + std::string deletingKey = "deleting_" + ikey; +VLOG(0) << "whs set deleting key = " << deletingKey << ", ikey " << ikey; + RocksDBPerfGuard guard(OP_PUT); + ROCKSDB_NAMESPACE::Status s = InTransaction_ ? + txn_->Put(handle, deletingKey, svalue) : + db_->Put(dbWriteOptions_, handle, deletingKey, svalue); + return ToStorageStatus(s); + +} + + Status RocksDBStorage::Del(const std::string& name, const std::string& key, bool ordered) { @@ -273,6 +300,14 @@ std::shared_ptr RocksDBStorage::GetAll(const std::string& name, this, std::move(ikey), 0, status, ordered); } +std::shared_ptr RocksDBStorage::GetPrefix(const std::string& prefix, + bool ordered) { + int status = inited_ ? 0 : -1; + return std::make_shared( + this, std::move(prefix), 0, status, ordered); +} + + size_t RocksDBStorage::Size(const std::string& name, bool ordered) { auto iterator = GetAll(name, ordered); if (iterator->Status() != 0) { @@ -505,6 +540,21 @@ bool RocksDBStorage::Recover(const std::string& dir) { return true; } +void RocksDBStorage::LoadAll(std::list& item) { + LOG(INFO) << "LoadAll storage from"; + std::string sprefix = "deleting_"; + rocksdb::Iterator* it1 = db_->NewIterator(rocksdb::ReadOptions()); + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string key = it1->key().ToString(); + if (curve::common::StringStartWith(key, sprefix)) { + VLOG(9) << "whs recovery: " << key; + item.push_back(key); + } + LOG(ERROR) << "whs recovery: " << key; + } + // GetPrefix(prefix, false) +} + } // namespace storage } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/storage/rocksdb_storage.h b/curvefs/src/metaserver/storage/rocksdb_storage.h index e0023dd8e2..7c2d7deb87 100644 --- a/curvefs/src/metaserver/storage/rocksdb_storage.h +++ b/curvefs/src/metaserver/storage/rocksdb_storage.h @@ -79,6 +79,8 @@ class RocksDBStorage : public KVStorage, public StorageTransaction { bool Close() override; + void LoadAll(std::list& item) override; + STORAGE_TYPE Type() override; StorageOptions GetStorageOptions() const override; @@ -92,6 +94,10 @@ class RocksDBStorage : public KVStorage, public StorageTransaction { const std::string& key, const ValueType& value) override; + Status HSetDeleting(const std::string& name, + const std::string& key, + const ValueType& value) override; + Status HDel(const std::string& name, const std::string& key) override; std::shared_ptr HGetAll(const std::string& name) override; @@ -100,6 +106,8 @@ class RocksDBStorage : public KVStorage, public StorageTransaction { Status HClear(const std::string& name) override; + std::shared_ptr GetPrefix(const std::string& prefix, + bool ordered) override; // ordered Status SGet(const std::string& name, const std::string& key, @@ -156,6 +164,11 @@ class RocksDBStorage : public KVStorage, public StorageTransaction { const ValueType& value, bool ordered); + Status SetDeleting(const std::string& name, + const std::string& key, + const ValueType& value, + bool ordered); + Status Del(const std::string& name, const std::string& key, bool ordered); @@ -219,6 +232,12 @@ inline Status RocksDBStorage::HSet(const std::string& name, return Set(name, key, value, false); } +inline Status RocksDBStorage::HSetDeleting(const std::string& name, + const std::string& key, + const ValueType& value) { + return SetDeleting(name, key, value, false); +} + inline Status RocksDBStorage::HDel(const std::string& name, const std::string& key) { return Del(name, key, false); diff --git a/curvefs/src/metaserver/storage/storage.h b/curvefs/src/metaserver/storage/storage.h index 97cef01fca..ae9559f803 100644 --- a/curvefs/src/metaserver/storage/storage.h +++ b/curvefs/src/metaserver/storage/storage.h @@ -23,6 +23,7 @@ #ifndef CURVEFS_SRC_METASERVER_STORAGE_STORAGE_H_ #define CURVEFS_SRC_METASERVER_STORAGE_STORAGE_H_ +#include #include #include #include @@ -51,6 +52,15 @@ class BaseStorage { const std::string& key, const ValueType& value) = 0; + virtual Status HSetDeleting(const std::string& name, + const std::string& key, + const ValueType& value){return Status::NotSupported();}; + + virtual std::shared_ptr GetPrefix( + const std::string& prefix, bool ordered) { + return nullptr; + } + virtual Status HDel(const std::string& name, const std::string& key) = 0; virtual std::shared_ptr HGetAll(const std::string& name) = 0; @@ -101,6 +111,8 @@ class KVStorage : public BaseStorage { virtual bool Close() = 0; + virtual void LoadAll(std::list& item) {}; + virtual StorageOptions GetStorageOptions() const = 0; virtual std::shared_ptr BeginTransaction() = 0; diff --git a/curvefs/src/metaserver/trash.cpp b/curvefs/src/metaserver/trash.cpp index 1175376e72..6e22e828c4 100644 --- a/curvefs/src/metaserver/trash.cpp +++ b/curvefs/src/metaserver/trash.cpp @@ -66,7 +66,7 @@ void TrashImpl::Add(uint32_t fsId, uint64_t inodeId, uint32_t dtime) { return; } trashItems_.push_back(item); - VLOG(6) << "Add Trash Item success, item.fsId = " << item.fsId + VLOG(9) << "add trash item success, item.fsId = " << item.fsId << ", item.inodeId = " << item.inodeId << ", item.dtime = " << item.dtime; } @@ -78,11 +78,13 @@ void TrashImpl::ScanTrash() { LockGuard lgItems(itemsMutex_); trashItems_.swap(temp); } - for (auto it = temp.begin(); it != temp.end();) { if (isStop_) { return; } + VLOG(9) << "ScanTrash , " << "item.fsId = " << it->fsId + << ", item.inodeId = " << it->inodeId + << ", item.dtime = " << it->dtime; if (NeedDelete(*it)) { MetaStatusCode ret = DeleteInodeAndData(*it); if (MetaStatusCode::NOT_FOUND == ret) { diff --git a/curvefs/src/metaserver/trash_manager.cpp b/curvefs/src/metaserver/trash_manager.cpp index 7f6341db1c..b591cf92c9 100644 --- a/curvefs/src/metaserver/trash_manager.cpp +++ b/curvefs/src/metaserver/trash_manager.cpp @@ -66,6 +66,7 @@ void TrashManager::ScanEveryTrash() { pair.second->ScanTrash(); } } + abortTrash_->ScanTrash(); } void TrashManager::Remove(uint32_t partitionId) { @@ -96,5 +97,21 @@ void TrashManager::ListItems(std::list *items) { } } +void TrashManager::BuildTrashItems() { + VLOG(3) << "build trash items start."; + std::list items; + kvStorage_->LoadAll(items); + std::vector names; + int size = 0; + for (auto iter: items) { + curve::common::SplitString(iter , ":", &names); + VLOG(3) << "build trash items: " << iter << ", size: " << ++size + << ", " << names[names.size() - 1 ] << ", " << names[names.size() - 2 ] ; + abortTrash_->Add(std::stoull(names[names.size() - 2 ]), + std::stoull(names[names.size() - 1 ]), 0); + } + VLOG(3) << "build trash items over."; +} + } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/trash_manager.h b/curvefs/src/metaserver/trash_manager.h index ca25c0145d..5527c0eaf5 100644 --- a/curvefs/src/metaserver/trash_manager.h +++ b/curvefs/src/metaserver/trash_manager.h @@ -27,6 +27,8 @@ #include #include +#include "src/common/string_util.h" + #include "src/common/concurrent/concurrent.h" #include "curvefs/src/metaserver/trash.h" @@ -52,6 +54,15 @@ class TrashManager { << partitionId; } + void BuildAbortTrash(const std::shared_ptr &kvStorage, const std::shared_ptr &trash) { + curve::common::WriteLockGuard lg(rwLock_); + trash->Init(options_); + abortTrash_ = trash; + kvStorage_ = kvStorage; + BuildTrashItems(); + LOG(INFO) << "build abort trash"; + } + void Remove(uint32_t partitionId); void Init(const TrashOption &options) { @@ -66,6 +77,8 @@ class TrashManager { void ListItems(std::list *items); + void BuildTrashItems(); + private: void ScanLoop(); @@ -79,6 +92,15 @@ class TrashManager { InterruptibleSleeper sleeper_; std::map> trashs_; +/** + * 在metastorage的Load函数中新建trash,并进行初始化 + * + *在ScanEveryTrash函数中,获取abortTrash_中的所有item,然后调用deleteinodeanddata* +*/ +// 新建一个Trash,处理因重启导致的需要重新trash的问题 +// 注意除了新建,还需要init + std::shared_ptr abortTrash_; + std::shared_ptr kvStorage_; curve::common::RWLock rwLock_; };