Skip to content

Commit

Permalink
curvefs/sdk: log call stack to stdout for hadoop filesystem on debug …
Browse files Browse the repository at this point in the history
…mode.

Signed-off-by: Wine93 <[email protected]>
  • Loading branch information
Wine93 committed Nov 1, 2023
1 parent 8118cdb commit 00f06b1
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.opencurve.curve.fs.common;

import java.text.SimpleDateFormat;
import java.util.Date;

public class StackLogger {
private String classname;
private int level;

private static final String CURVEFS_DEBUG_ENV_VAR = "CURVEFS_DEBUG";

public StackLogger(String classname, int level) {
this.classname = classname;
this.level = level;
}

private String now() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return formatter.format(new Date());
}

public void log(String fn, Object... args) {
String value = System.getenv(CURVEFS_DEBUG_ENV_VAR);
if (!Boolean.valueOf(value)) {
return;
}

String[] params = new String[args.length];
for (int i = 0; i < args.length; i++) {
params[i] = args[i].toString();
}

String indent = " ".repeat(this.level * 4);
String param = String.join(",", params);
String message = String.format("+ [%s] %s%s.%s(%s)", now(), indent, this.classname, fn, param);
System.out.println(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
import io.opencurve.curve.fs.libfs.CurveFSMount;
import io.opencurve.curve.fs.common.StackLogger;

import java.io.IOException;

Expand All @@ -37,6 +38,7 @@
*/
public class CurveFSInputStream extends FSInputStream {
private static final Log LOG = LogFactory.getLog(CurveFSInputStream.class);
private static final StackLogger logger = new StackLogger("CurveFSInputStream", 0);
private boolean closed;

private int fileHandle;
Expand All @@ -59,21 +61,25 @@ public class CurveFSInputStream extends FSInputStream {
*/
public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
int fh, long flength, int bufferSize) {
// Whoever's calling the constructor is responsible for doing the actual curve_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
curve = curvefs;
buffer = new byte[1<<21];
LOG.debug("CurveInputStream constructor: initializing stream with fh "
+ fh + " and file length " + flength);
logger.log("CurveFSInputStream");

// Whoever's calling the constructor is responsible for doing the actual curve_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
curve = curvefs;
buffer = new byte[1<<21];
LOG.debug("CurveInputStream constructor: initializing stream with fh "
+ fh + " and file length " + flength);
}

/** Curve likes things to be closed before it shuts down,
* so closing the IOStream stuff voluntarily in a finalizer is good
*/
protected void finalize() throws Throwable {
logger.log("finalize");

try {
if (!closed) {
close();
Expand All @@ -84,6 +90,8 @@ protected void finalize() throws Throwable {
}

private synchronized boolean fillBuffer() throws IOException {
logger.log("fillBuffer");

bufValid = curve.read(fileHandle, buffer, buffer.length, -1);
bufPos = 0;
if (bufValid < 0) {
Expand Down Expand Up @@ -118,6 +126,8 @@ public synchronized int available() throws IOException {
}

public synchronized void seek(long targetPos) throws IOException {
logger.log("seek", targetPos);

LOG.trace("CurveInputStream.seek: Seeking to position " + targetPos + " on fd "
+ fileHandle);
if (targetPos > fileLength) {
Expand All @@ -143,6 +153,7 @@ public synchronized void seek(long targetPos) throws IOException {
* @return false.
*/
public synchronized boolean seekToNewSource(long targetPos) {
logger.log("seekToNewSource", targetPos);
return false;
}

Expand All @@ -152,6 +163,7 @@ public synchronized boolean seekToNewSource(long targetPos) {
*/
@Override
public synchronized int read() throws IOException {
logger.log("read");
LOG.trace(
"CurveInputStream.read: Reading a single byte from fd " + fileHandle
+ " by calling general read function");
Expand Down Expand Up @@ -183,6 +195,8 @@ public synchronized int read() throws IOException {
*/
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
logger.log("read", off, len);

LOG.trace(
"CurveInputStream.read: Reading " + len + " bytes from fd " + fileHandle);

Expand Down Expand Up @@ -241,6 +255,8 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
*/
@Override
public void close() throws IOException {
logger.log("close");

LOG.trace("CurveOutputStream.close:enter");
if (!closed) {
curve.close(fileHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

package io.opencurve.curve.fs.hadoop;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import io.opencurve.curve.fs.common.StackLogger;
import io.opencurve.curve.fs.libfs.CurveFSMount;

import java.io.IOException;
Expand All @@ -42,6 +41,8 @@
* much more.
*/
public class CurveFSOutputStream extends OutputStream {
private static final StackLogger logger = new StackLogger("CurveFSOutputStream", 0);

private boolean closed;

private CurveFSProto curve;
Expand All @@ -58,6 +59,7 @@ public class CurveFSOutputStream extends OutputStream {
*/
public CurveFSOutputStream(Configuration conf, CurveFSProto curvefs,
int fh, int bufferSize) {
logger.log("CurveFSOutputStream", fh, bufferSize);
curve = curvefs;
fileHandle = fh;
closed = false;
Expand All @@ -68,6 +70,8 @@ public CurveFSOutputStream(Configuration conf, CurveFSProto curvefs,
* Close the Curve file handle if close() wasn't explicitly called.
*/
protected void finalize() throws Throwable {
logger.log("finalize");

try {
if (!closed) {
close();
Expand All @@ -91,19 +95,23 @@ private synchronized void checkOpen() throws IOException {
* @return The file offset in bytes.
*/
public synchronized long getPos() throws IOException {
logger.log("getPos");
checkOpen();
return curve.lseek(fileHandle, 0, CurveFSMount.SEEK_CUR);
}

@Override
public synchronized void write(int b) throws IOException {
logger.log("write", b);

byte buf[] = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
}

@Override
public synchronized void write(byte buf[], int off, int len) throws IOException {
logger.log("write", off, len);
checkOpen();

while (len > 0) {
Expand Down Expand Up @@ -159,13 +167,16 @@ private synchronized void flushBuffer() throws IOException {

@Override
public synchronized void flush() throws IOException {
logger.log("flush");

checkOpen();
flushBuffer(); // buffer -> libcurvefs
curve.fsync(fileHandle); // libcurvefs -> cluster
}

@Override
public synchronized void close() throws IOException {
logger.log("close");
checkOpen();
flush();
curve.close(fileHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import io.opencurve.curve.fs.libfs.CurveFSMount;
import io.opencurve.curve.fs.libfs.CurveFSStat;
import io.opencurve.curve.fs.libfs.CurveFSStatVFS;
import io.opencurve.curve.fs.common.StackLogger;

import java.util.UUID;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,9 +39,11 @@
class CurveFSTalker extends CurveFSProto {
private CurveFSMount mount;
private String fsname = null;
private String mountpoint = null;
private boolean inited = false;

private static final String PREFIX_KEY = "curvefs";
private static final StackLogger logger = new StackLogger("CurveFSTalker", 1);

CurveFSTalker(Configuration conf, Log log) {
mount = null;
Expand Down Expand Up @@ -67,36 +71,44 @@ private void loadCfg(Configuration conf) {

@Override
void initialize(URI uri, Configuration conf) throws IOException {
logger.log("initialize", uri);

mount = new CurveFSMount();
loadCfg(conf);
if (null == fsname || fsname.isEmpty()) {
throw new IOException("curvefs.name is not set");
}
mount.mount(fsname, "/");
mountpoint = UUID.randomUUID().toString();
mount.mount(fsname, mountpoint);
inited = true;
}

@Override
void shutdown() throws IOException {
logger.log("shutdown");

if (inited) {
mount.umount(fsname, "/");
mount.umount(fsname, mountpoint);
mount = null;
inited = false;
}
}

@Override
void mkdirs(Path path, int mode) throws IOException {
logger.log("mkdirs");
mount.mkdirs(tostr(path), mode);
}

@Override
void rmdir(Path path) throws IOException {
logger.log("rmdir");
mount.rmdir(tostr(path));
}

@Override
String[] listdir(Path path) throws IOException {
logger.log("listdir", path);
CurveFSStat stat = new CurveFSStat();
try {
mount.lstat(tostr(path), stat);
Expand All @@ -112,71 +124,85 @@ String[] listdir(Path path) throws IOException {

@Override
int open(Path path, int flags, int mode) throws IOException {
logger.log("open", path, flags, mode);
return mount.open(tostr(path), flags, mode);
}

@Override
long lseek(int fd, long offset, int whence) throws IOException {
logger.log("lseek", fd, offset, whence);
return mount.lseek(fd, offset, whence);
}

@Override
int write(int fd, byte[] buf, long size, long offset) throws IOException {
logger.log("write", fd, size, offset);
return mount.write(fd, buf, size, offset);
}

@Override
int read(int fd, byte[] buf, long size, long offset) throws IOException {
logger.log("read", fd, size, offset);
return mount.read(fd, buf, size, offset);
}

@Override
void fsync(int fd) throws IOException {
logger.log("fsync", fd);
mount.fsync(fd);
}

@Override
void close(int fd) throws IOException {
logger.log("close", fd);
mount.close(fd);
}

@Override
void unlink(Path path) throws IOException {
logger.log("unlink", path);
mount.unlink(tostr(path));
}

@Override
void statfs(Path path, CurveFSStatVFS stat) throws IOException {
logger.log("statfs", path);
mount.statfs(tostr(path), stat);
}

@Override
void lstat(Path path, CurveFSStat stat) throws IOException {
logger.log("lstat", path);
mount.lstat(tostr(path), stat);
}

@Override
void fstat(int fd, CurveFSStat stat) throws IOException {
logger.log("fstat", fd);
mount.fstat(fd, stat);
}

@Override
void setattr(Path path, CurveFSStat stat, int mask) throws IOException {
logger.log("setattr", path);
mount.setattr(tostr(path), stat, mask);
}

@Override
void chmod(Path path, int mode) throws IOException {
logger.log("chmod", path, mode);
mount.chmod(tostr(path), mode);
}

@Override
void chown(Path path, int uid, int gid) throws IOException {
logger.log("chown", path, uid, gid);
mount.chown(tostr(path), uid, gid);
}

@Override
void rename(Path src, Path dst) throws IOException {
logger.log("rename", src, dst);
mount.rename(tostr(src), tostr(dst));
}
}
Loading

0 comments on commit 00f06b1

Please sign in to comment.