Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/sdk: some improves for curvefs sdk. #2853

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .obm.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
container_name: curve-build-playground.master
container_image: opencurvedocker/curve-base:build-debian11
container_image: opencurvedocker/curve-build:ubuntu22
6 changes: 3 additions & 3 deletions curvefs/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ How to build

``` bash
$ git clone [email protected]:opencurve/curve.git
$ cd curve
$ make dep stor=fs
$ make playground
$ make ci-dep stor=fs
$ make sdk
```

It will generate a jar after build success:
It will generate a jar package after build success:

```
Build SDK success => /curve/curvefs/sdk/output/curvefs-hadoop-1.0-SNAPSHOT.jar
Expand Down
2 changes: 2 additions & 0 deletions curvefs/sdk/java/native/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ cc_binary(
copts = CURVE_DEFAULT_COPTS,
linkopts = [
"-Wl,-rpath=/tmp/libcurvefs,--disable-new-dtags",
"-L/usr/lib/x86_64-linux-gnu/",
"-lhashkit",
],
deps = [
"@com_google_absl//absl/cleanup",
Expand Down
3 changes: 3 additions & 0 deletions curvefs/sdk/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
<resource>
<directory>native/build</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.opencurve.curve.fs.common;

public class AccessLogger {
private String classname;
private int level;

private static final String CURVEFS_DEBUG_ENV_VAR = "CURVEFS_DEBUG";

public AccessLogger(String classname, int level) {
this.classname = classname;
this.level = level;
}

public void log(String fn, Object... args) {
String value = System.getenv(CURVEFS_DEBUG_ENV_VAR);
if (!Boolean.valueOf(value)) {
return;
}

String[] params = new String[args.length];
for (int i = 0; i < args.length; i++) {
params[i] = args[i].toString();
}

String indent = " ".repeat(this.level * 4);
String param = String.join(",", params);
String message = String.format("+ %s%s.%s(%s)", indent, this.classname, fn, param);
System.out.println(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.opencurve.curve.fs.flink;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.opencurve.curve.fs.hadoop.CurveFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
Expand All @@ -12,22 +14,29 @@
public class CurveFileSystemFactory implements FileSystemFactory {
private org.apache.hadoop.conf.Configuration conf;

private static final Log LOG = LogFactory.getLog(CurveFileSystemFactory.class);

private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs.";
private static final String FLINK_CONFIG_PREFIXES = "fs.";
public static String SCHEME = "curvefs";

@Override
public void configure(org.apache.flink.configuration.Configuration config) {
conf = new Configuration();

LOG.info("#### configure");

if (config != null) {
for (String key : config.keySet()) {
if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) {
String value = config.getString(key, null);
if (value != null) {
if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) {
SCHEME = key.split("\\.")[1];
LOG.info("##### SCHEME KEY=" + key + ",VAL=" + value);
}
conf.set(key, value);
LOG.info("##### ADD KEY=" + key + ",VAL=" + value);
}
}
}
Expand All @@ -42,6 +51,19 @@ public String getScheme() {
@Override
public FileSystem create(URI uri) throws IOException {
CurveFileSystem fs = new CurveFileSystem();
conf.set("curvefs.name", "hadoop-test01");
conf.set("curvefs.diskCache.diskCacheType", "0");
conf.set("curvefs.s3.ak", "dd42c79a9e0d4719bc16b790e24f5329");
conf.set("curvefs.s3.sk", "ee2ddb13eb9e4708843c55eb2d4b5ec7");
conf.set("curvefs.s3.endpoint", "nos-jd.service.163.org");
conf.set("curvefs.s3.bucket_name", "curvefs-chuanmei-hadoop-01");
conf.set("curvefs.mdsOpt.rpcRetryOpt.addrs", "10.166.16.60:6700,10.166.16.61:6700,10.166.16.62:6700");
conf.set("curvefs.fs.accessLogging", "true");
conf.set("curvefs.s3.logPrefix", "/tmp");
conf.set("curvefs.client.common.logDir", "/tmp");

LOG.info("#### hardcode for keyset");

fs.initialize(uri, conf);
return new HadoopFileSystem(fs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import io.opencurve.curve.fs.libfs.CurveFSMount;
import io.opencurve.curve.fs.libfs.CurveFSStat;
import io.opencurve.curve.fs.libfs.CurveFSStatVFS;
import io.opencurve.curve.fs.common.AccessLogger;

import java.util.UUID;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,9 +39,11 @@
class CurveFSTalker extends CurveFSProto {
private CurveFSMount mount;
private String fsname = null;
private String mountpoint = null;
private boolean inited = false;

private static final String PREFIX_KEY = "curvefs";
private static final AccessLogger logger = new AccessLogger("CurveFSTalker", 1);

CurveFSTalker(Configuration conf, Log log) {
mount = null;
Expand Down Expand Up @@ -67,36 +71,44 @@ private void loadCfg(Configuration conf) {

@Override
void initialize(URI uri, Configuration conf) throws IOException {
logger.log("initialize", uri);

mount = new CurveFSMount();
loadCfg(conf);
if (null == fsname || fsname.isEmpty()) {
throw new IOException("curvefs.name is not set");
}
mount.mount(fsname, "/");
mountpoint = UUID.randomUUID().toString();
mount.mount(fsname, mountpoint);
inited = true;
}

@Override
void shutdown() throws IOException {
logger.log("shutdown");

if (inited) {
mount.umount(fsname, "/");
mount = null;
mount.umount(fsname, mountpoint);
//mount = null;
inited = false;
}
}

@Override
void mkdirs(Path path, int mode) throws IOException {
logger.log("mkdirs");
mount.mkdirs(tostr(path), mode);
}

@Override
void rmdir(Path path) throws IOException {
logger.log("rmdir");
mount.rmdir(tostr(path));
}

@Override
String[] listdir(Path path) throws IOException {
logger.log("listdir", path);
CurveFSStat stat = new CurveFSStat();
try {
mount.lstat(tostr(path), stat);
Expand All @@ -112,71 +124,85 @@ String[] listdir(Path path) throws IOException {

@Override
int open(Path path, int flags, int mode) throws IOException {
logger.log("open", path, flags, mode);
return mount.open(tostr(path), flags, mode);
}

@Override
long lseek(int fd, long offset, int whence) throws IOException {
logger.log("lseek", fd, offset, whence);
return mount.lseek(fd, offset, whence);
}

@Override
int write(int fd, byte[] buf, long size, long offset) throws IOException {
logger.log("lseek", fd, size, offset);
return mount.write(fd, buf, size, offset);
}

@Override
int read(int fd, byte[] buf, long size, long offset) throws IOException {
logger.log("lseek", fd, size, offset);
return mount.read(fd, buf, size, offset);
}

@Override
void fsync(int fd) throws IOException {
logger.log("fsync", fd);
mount.fsync(fd);
}

@Override
void close(int fd) throws IOException {
logger.log("close", fd);
mount.close(fd);
}

@Override
void unlink(Path path) throws IOException {
logger.log("unlink", path);
mount.unlink(tostr(path));
}

@Override
void statfs(Path path, CurveFSStatVFS stat) throws IOException {
logger.log("statfs", path);
mount.statfs(tostr(path), stat);
}

@Override
void lstat(Path path, CurveFSStat stat) throws IOException {
logger.log("lstat", path);
mount.lstat(tostr(path), stat);
}

@Override
void fstat(int fd, CurveFSStat stat) throws IOException {
logger.log("fstat", fd);
mount.fstat(fd, stat);
}

@Override
void setattr(Path path, CurveFSStat stat, int mask) throws IOException {
logger.log("setattr", path);
mount.setattr(tostr(path), stat, mask);
}

@Override
void chmod(Path path, int mode) throws IOException {
logger.log("chmod", path, mode);
mount.chmod(tostr(path), mode);
}

@Override
void chown(Path path, int uid, int gid) throws IOException {
logger.log("chown", path, uid, gid);
mount.chown(tostr(path), uid, gid);
}

@Override
void rename(Path src, Path dst) throws IOException {
logger.log("rename", src, dst);
mount.rename(tostr(src), tostr(dst));
}
}
Loading