Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/client: fix trash bugs #2915

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .obm.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
container_name: curve-build-playground.master
container_image: opencurvedocker/curve-base:build-debian11
container_image: opencurvedocker/curve-build:ubuntu22
6 changes: 3 additions & 3 deletions curvefs/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ How to build

``` bash
$ git clone [email protected]:opencurve/curve.git
$ cd curve
$ make dep stor=fs
$ make playground
$ make ci-dep stor=fs
$ make sdk
```

It will generate a jar after build success:
It will generate a jar package after build success:

```
Build SDK success => /curve/curvefs/sdk/output/curvefs-hadoop-1.0-SNAPSHOT.jar
Expand Down
2 changes: 2 additions & 0 deletions curvefs/sdk/java/native/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ cc_binary(
copts = CURVE_DEFAULT_COPTS,
linkopts = [
"-Wl,-rpath=/tmp/libcurvefs,--disable-new-dtags",
"-L/usr/lib/x86_64-linux-gnu/",
"-lhashkit",
],
deps = [
"@com_google_absl//absl/cleanup",
Expand Down
18 changes: 18 additions & 0 deletions curvefs/sdk/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,25 @@
<resource>
<directory>native/build</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opencurve.curve.fs.flink;

import io.opencurve.curve.fs.hadoop.CurveFileSystem;
Expand All @@ -10,28 +26,17 @@
import java.net.URI;

public class CurveFileSystemFactory implements FileSystemFactory {
private org.apache.hadoop.conf.Configuration conf;

private org.apache.hadoop.conf.Configuration conf = new Configuration();
private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs.";
private static final String FLINK_CONFIG_PREFIXES = "fs.";
public static String SCHEME = "curvefs";

@Override
public void configure(org.apache.flink.configuration.Configuration config) {
conf = new Configuration();
if (config != null) {
for (String key : config.keySet()) {
if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) {
String value = config.getString(key, null);
if (value != null) {
if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) {
SCHEME = key.split("\\.")[1];
}
conf.set(key, value);
}
}
}
}
config.keySet()
.stream()
.filter(key -> key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES))
.forEach(key -> conf.set(key, config.getString(key, "")));
}

@Override
Expand All @@ -45,4 +50,4 @@ public FileSystem create(URI uri) throws IOException {
fs.initialize(uri, conf);
return new HadoopFileSystem(fs);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opencurve.curve.fs.flink;

import org.apache.flink.connector.file.table.FileSystemTableFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class CurveFSInputStream extends FSInputStream {
*/
public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
int fh, long flength, int bufferSize) {
// Whoever's calling the constructor is responsible for doing the actual curve_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
Expand All @@ -73,6 +71,7 @@ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
/** Curve likes things to be closed before it shuts down,
* so closing the IOStream stuff voluntarily in a finalizer is good
*/
@Override
protected void finalize() throws Throwable {
try {
if (!closed) {
Expand All @@ -91,7 +90,6 @@ private synchronized boolean fillBuffer() throws IOException {

bufValid = 0;

// attempt to reset to old position. If it fails, too bad.
curve.lseek(fileHandle, curvePos, CurveFSMount.SEEK_SET);
throw new IOException("Failed to fill read buffer! Error code:" + err);
}
Expand All @@ -102,6 +100,7 @@ private synchronized boolean fillBuffer() throws IOException {
/*
* Get the current position of the stream.
*/
@Override
public synchronized long getPos() throws IOException {
return curvePos - bufValid + bufPos;
}
Expand All @@ -117,6 +116,7 @@ public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}

@Override
public synchronized void seek(long targetPos) throws IOException {
LOG.trace("CurveInputStream.seek: Seeking to position " + targetPos + " on fd "
+ fileHandle);
Expand All @@ -142,6 +142,7 @@ public synchronized void seek(long targetPos) throws IOException {
* they'll be dealt with before anybody even tries to call this method!
* @return false.
*/
@Override
public synchronized boolean seekToNewSource(long targetPos) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.opencurve.curve.fs.libfs.CurveFSStat;
import io.opencurve.curve.fs.libfs.CurveFSStatVFS;

import java.util.UUID;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,6 +38,7 @@
class CurveFSTalker extends CurveFSProto {
private CurveFSMount mount;
private String fsname = null;
private String mountpoint = null;
private boolean inited = false;

private static final String PREFIX_KEY = "curvefs";
Expand Down Expand Up @@ -72,14 +74,15 @@ void initialize(URI uri, Configuration conf) throws IOException {
if (null == fsname || fsname.isEmpty()) {
throw new IOException("curvefs.name is not set");
}
mount.mount(fsname, "/");
mountpoint = UUID.randomUUID().toString();
mount.mount(fsname, mountpoint);
inited = true;
}

@Override
void shutdown() throws IOException {
if (inited) {
mount.umount(fsname, "/");
mount.umount(fsname, mountpoint);
mount = null;
inited = false;
}
Expand Down Expand Up @@ -179,4 +182,4 @@ void chown(Path path, int uid, int gid) throws IOException {
void rename(Path src, Path dst) throws IOException {
mount.rename(tostr(src), tostr(dst));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ private Path makeAbsolute(Path path) {
return new Path(workingDir, path);
}

@Override
public URI getUri() {
return uri;
}

@Override
public String getScheme() {
return uri.getScheme();
return "hdfs";
}

@Override
Expand All @@ -85,14 +87,12 @@ public void initialize(URI uri, Configuration conf) throws IOException {
this.workingDir = getHomeDirectory();
}


@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
path = makeAbsolute(path);

// throws filenotfoundexception if path is a directory
int fd = curve.open(path, CurveFSMount.O_RDONLY, 0);

/* get file size */
CurveFSStat stat = new CurveFSStat();
curve.fstat(fd, stat);

Expand All @@ -102,10 +102,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {

@Override
public void close() throws IOException {
super.close(); // this method does stuff, make sure it's run!
super.close();
curve.shutdown();
}

@Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException {
path = makeAbsolute(path);

Expand All @@ -122,6 +123,7 @@ public FSDataOutputStream append(Path path, int bufferSize, Progressable progres
return new FSDataOutputStream(ostream, statistics);
}

@Override
public Path getWorkingDirectory() {
return workingDir;
}
Expand All @@ -144,6 +146,7 @@ public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, perms);
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
path = makeAbsolute(path);

Expand All @@ -160,7 +163,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
return status;
}


@Override
public FileStatus[] listStatus(Path path) throws IOException {
path = makeAbsolute(path);

Expand All @@ -174,12 +177,10 @@ public FileStatus[] listStatus(Path path) throws IOException {
for (int i = 0; i < status.length; i++) {
status[i] = getFileStatus(new Path(path, dirlist[i]));
}
curve.shutdown();
return status;
} else {
throw new FileNotFoundException("File " + path + " does not exist.");
}

}

@Override
Expand Down Expand Up @@ -208,9 +209,9 @@ public void setTimes(Path path, long mtime, long atime) throws IOException {
curve.setattr(path, stat, mask);
}

@Override
public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {

path = makeAbsolute(path);

boolean exists = exists(path);
Expand Down Expand Up @@ -268,6 +269,7 @@ public void setOwner(Path path, String username, String groupname) throws IOExce
}

@Deprecated
@Override
public FSDataOutputStream createNonRecursive(Path path, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Expand All @@ -278,7 +280,7 @@ public FSDataOutputStream createNonRecursive(Path path, FsPermission permission,

if (parent != null) {
CurveFSStat stat = new CurveFSStat();
curve.lstat(parent, stat); // handles FileNotFoundException case
curve.lstat(parent, stat);
if (stat.isFile()) {
throw new FileAlreadyExistsException(parent.toString());
}
Expand Down Expand Up @@ -314,28 +316,27 @@ public boolean rename(Path src, Path dst) throws IOException {
}

@Deprecated
@Override
public boolean delete(Path path) throws IOException {
return delete(path, false);
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
path = makeAbsolute(path);

/* path exists? */
FileStatus status;
try {
status = getFileStatus(path);
} catch (FileNotFoundException e) {
return false;
}

/* we're done if its a file */
if (status.isFile()) {
curve.unlink(path);
return true;
}

/* get directory contents */
FileStatus[] dirlist = listStatus(path);
if (dirlist == null) {
return false;
Expand Down Expand Up @@ -383,6 +384,6 @@ protected int getDefaultPort() {

@Override
public String getCanonicalServiceName() {
return null; // Does not support Token
return null;
}
}

This file was deleted.

Loading