Skip to content

Commit

Permalink
debug.
Browse files Browse the repository at this point in the history
Signed-off-by: Wine93 <[email protected]>
  • Loading branch information
Wine93 committed Oct 31, 2023
1 parent 7f4a097 commit 7cbbd41
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .obm.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
container_name: curve-build-playground.master
container_image: opencurvedocker/curve-base:build-debian11
container_image: opencurvedocker/curve-build:ubuntu22
6 changes: 3 additions & 3 deletions curvefs/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ How to build

``` bash
$ git clone [email protected]:opencurve/curve.git
$ cd curve
$ make dep stor=fs
$ make playground
$ make ci-dep stor=fs
$ make sdk
```

It will generate a jar after build success:
It will generate a jar package after build success:

```
Build SDK success => /curve/curvefs/sdk/output/curvefs-hadoop-1.0-SNAPSHOT.jar
Expand Down
2 changes: 2 additions & 0 deletions curvefs/sdk/java/native/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ cc_binary(
copts = CURVE_DEFAULT_COPTS,
linkopts = [
"-Wl,-rpath=/tmp/libcurvefs,--disable-new-dtags",
"-L/usr/lib/x86_64-linux-gnu/",
"-lhashkit",
],
deps = [
"@com_google_absl//absl/cleanup",
Expand Down
3 changes: 3 additions & 0 deletions curvefs/sdk/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
<resource>
<directory>native/build</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.opencurve.curve.fs.common;

public class AccessLogger {
private String classname;
private int level;

private static final String CURVEFS_DEBUG_ENV_VAR = "CURVEFS_DEBUG";

public AccessLogger(String classname, int level) {
this.classname = classname;
this.level = level;
}

public void log(String fn, Object... args) {
String value = System.getenv(CURVEFS_DEBUG_ENV_VAR);
if (!Boolean.valueOf(value)) {
return;
}

String[] params = new String[args.length];
for (int i = 0; i < args.length; i++) {
params[i] = args[i].toString();
}

String indent = " ".repeat(this.level * 4);
String param = String.join(",", params);
String message = String.format("+ %s%s.%s(%s)", indent, this.classname, fn, param);
System.out.println(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.opencurve.curve.fs.flink;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.opencurve.curve.fs.hadoop.CurveFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
Expand All @@ -12,22 +14,29 @@
public class CurveFileSystemFactory implements FileSystemFactory {
private org.apache.hadoop.conf.Configuration conf;

private static final Log LOG = LogFactory.getLog(CurveFileSystemFactory.class);

private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs.";
private static final String FLINK_CONFIG_PREFIXES = "fs.";
public static String SCHEME = "curvefs";

@Override
public void configure(org.apache.flink.configuration.Configuration config) {
conf = new Configuration();

LOG.info("#### configure");

if (config != null) {
for (String key : config.keySet()) {
if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) {
String value = config.getString(key, null);
if (value != null) {
if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) {
SCHEME = key.split("\\.")[1];
LOG.info("##### SCHEME KEY=" + key + ",VAL=" + value);
}
conf.set(key, value);
LOG.info("##### ADD KEY=" + key + ",VAL=" + value);
}
}
}
Expand All @@ -42,6 +51,19 @@ public String getScheme() {
@Override
public FileSystem create(URI uri) throws IOException {
CurveFileSystem fs = new CurveFileSystem();
conf.set("curvefs.name", "hadoop-test01");
conf.set("curvefs.diskCache.diskCacheType", "0");
conf.set("curvefs.s3.ak", "dd42c79a9e0d4719bc16b790e24f5329");
conf.set("curvefs.s3.sk", "ee2ddb13eb9e4708843c55eb2d4b5ec7");
conf.set("curvefs.s3.endpoint", "nos-jd.service.163.org");
conf.set("curvefs.s3.bucket_name", "curvefs-chuanmei-hadoop-01");
conf.set("curvefs.mdsOpt.rpcRetryOpt.addrs", "10.166.16.60:6700,10.166.16.61:6700,10.166.16.62:6700");
conf.set("curvefs.fs.accessLogging", "true");
conf.set("curvefs.s3.logPrefix", "/tmp");
conf.set("curvefs.client.common.logDir", "/tmp");

LOG.info("#### hardcode for keyset");

fs.initialize(uri, conf);
return new HadoopFileSystem(fs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import io.opencurve.curve.fs.libfs.CurveFSMount;
import io.opencurve.curve.fs.libfs.CurveFSStat;
import io.opencurve.curve.fs.libfs.CurveFSStatVFS;
import io.opencurve.curve.fs.common.AccessLogger;

import java.util.UUID;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,9 +39,11 @@
class CurveFSTalker extends CurveFSProto {
private CurveFSMount mount;
private String fsname = null;
private String mountpoint = null;
private boolean inited = false;

private static final String PREFIX_KEY = "curvefs";
private static final AccessLogger logger = new AccessLogger("CurveFSTalker", 1);

CurveFSTalker(Configuration conf, Log log) {
mount = null;
Expand Down Expand Up @@ -67,36 +71,44 @@ private void loadCfg(Configuration conf) {

@Override
void initialize(URI uri, Configuration conf) throws IOException {
logger.log("initialize", uri);

mount = new CurveFSMount();
loadCfg(conf);
if (null == fsname || fsname.isEmpty()) {
throw new IOException("curvefs.name is not set");
}
mount.mount(fsname, "/");
mountpoint = UUID.randomUUID().toString();
mount.mount(fsname, mountpoint);
inited = true;
}

@Override
void shutdown() throws IOException {
logger.log("shutdown");

if (inited) {
mount.umount(fsname, "/");
mount = null;
mount.umount(fsname, mountpoint);
//mount = null;
inited = false;
}
}

@Override
void mkdirs(Path path, int mode) throws IOException {
logger.log("mkdirs");
mount.mkdirs(tostr(path), mode);
}

@Override
void rmdir(Path path) throws IOException {
logger.log("rmdir");
mount.rmdir(tostr(path));
}

@Override
String[] listdir(Path path) throws IOException {
logger.log("listdir", path);
CurveFSStat stat = new CurveFSStat();
try {
mount.lstat(tostr(path), stat);
Expand All @@ -112,71 +124,85 @@ String[] listdir(Path path) throws IOException {

@Override
int open(Path path, int flags, int mode) throws IOException {
logger.log("open", path, flags, mode);
return mount.open(tostr(path), flags, mode);
}

@Override
long lseek(int fd, long offset, int whence) throws IOException {
logger.log("lseek", fd, offset, whence);
return mount.lseek(fd, offset, whence);
}

@Override
int write(int fd, byte[] buf, long size, long offset) throws IOException {
logger.log("lseek", fd, size, offset);
return mount.write(fd, buf, size, offset);
}

@Override
int read(int fd, byte[] buf, long size, long offset) throws IOException {
logger.log("lseek", fd, size, offset);
return mount.read(fd, buf, size, offset);
}

@Override
void fsync(int fd) throws IOException {
logger.log("fsync", fd);
mount.fsync(fd);
}

@Override
void close(int fd) throws IOException {
logger.log("close", fd);
mount.close(fd);
}

@Override
void unlink(Path path) throws IOException {
logger.log("unlink", path);
mount.unlink(tostr(path));
}

@Override
void statfs(Path path, CurveFSStatVFS stat) throws IOException {
logger.log("statfs", path);
mount.statfs(tostr(path), stat);
}

@Override
void lstat(Path path, CurveFSStat stat) throws IOException {
logger.log("lstat", path);
mount.lstat(tostr(path), stat);
}

@Override
void fstat(int fd, CurveFSStat stat) throws IOException {
logger.log("fstat", fd);
mount.fstat(fd, stat);
}

@Override
void setattr(Path path, CurveFSStat stat, int mask) throws IOException {
logger.log("setattr", path);
mount.setattr(tostr(path), stat, mask);
}

@Override
void chmod(Path path, int mode) throws IOException {
logger.log("chmod", path, mode);
mount.chmod(tostr(path), mode);
}

@Override
void chown(Path path, int uid, int gid) throws IOException {
logger.log("chown", path, uid, gid);
mount.chown(tostr(path), uid, gid);
}

@Override
void rename(Path src, Path dst) throws IOException {
logger.log("rename", src, dst);
mount.rename(tostr(src), tostr(dst));
}
}
Loading

0 comments on commit 7cbbd41

Please sign in to comment.