Skip to content

Commit

Permalink
curvefs/sdk: add maven build plugin,optimize the logic of parsing inp…
Browse files Browse the repository at this point in the history
…ut parameters in flink

Signed-off-by: fine97 <[email protected]>
  • Loading branch information
fine97 authored and Wine93 committed Nov 23, 2023
1 parent 2f601bd commit 7904502
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 46 deletions.
15 changes: 15 additions & 0 deletions curvefs/sdk/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@
<directory>src/main/resources</directory>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opencurve.curve.fs.flink;

import io.opencurve.curve.fs.hadoop.CurveFileSystem;
Expand All @@ -10,28 +26,17 @@
import java.net.URI;

public class CurveFileSystemFactory implements FileSystemFactory {
private org.apache.hadoop.conf.Configuration conf;

private org.apache.hadoop.conf.Configuration conf = new Configuration();
private static final String CURVE_FS_CONFIG_PREFIXES = "curvefs.";
private static final String FLINK_CONFIG_PREFIXES = "fs.";
public static String SCHEME = "curvefs";

@Override
public void configure(org.apache.flink.configuration.Configuration config) {
conf = new Configuration();
if (config != null) {
for (String key : config.keySet()) {
if (key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES)) {
String value = config.getString(key, null);
if (value != null) {
if (CurveFileSystem.class.getCanonicalName().equals(value.trim())) {
SCHEME = key.split("\\.")[1];
}
conf.set(key, value);
}
}
}
}
config.keySet()
.stream()
.filter(key -> key.startsWith(CURVE_FS_CONFIG_PREFIXES) || key.startsWith(FLINK_CONFIG_PREFIXES))
.forEach(key -> conf.set(key, config.getString(key, "")));
}

@Override
Expand All @@ -45,4 +50,4 @@ public FileSystem create(URI uri) throws IOException {
fs.initialize(uri, conf);
return new HadoopFileSystem(fs);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opencurve.curve.fs.flink;

import org.apache.flink.connector.file.table.FileSystemTableFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class CurveFSInputStream extends FSInputStream {
*/
public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
int fh, long flength, int bufferSize) {
// Whoever's calling the constructor is responsible for doing the actual curve_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
Expand All @@ -73,6 +71,7 @@ public CurveFSInputStream(Configuration conf, CurveFSProto curvefs,
/** Curve likes things to be closed before it shuts down,
* so closing the IOStream stuff voluntarily in a finalizer is good
*/
@Override
protected void finalize() throws Throwable {
try {
if (!closed) {
Expand All @@ -91,7 +90,6 @@ private synchronized boolean fillBuffer() throws IOException {

bufValid = 0;

// attempt to reset to old position. If it fails, too bad.
curve.lseek(fileHandle, curvePos, CurveFSMount.SEEK_SET);
throw new IOException("Failed to fill read buffer! Error code:" + err);
}
Expand All @@ -102,6 +100,7 @@ private synchronized boolean fillBuffer() throws IOException {
/*
* Get the current position of the stream.
*/
@Override
public synchronized long getPos() throws IOException {
return curvePos - bufValid + bufPos;
}
Expand All @@ -117,6 +116,7 @@ public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}

@Override
public synchronized void seek(long targetPos) throws IOException {
LOG.trace("CurveInputStream.seek: Seeking to position " + targetPos + " on fd "
+ fileHandle);
Expand All @@ -142,6 +142,7 @@ public synchronized void seek(long targetPos) throws IOException {
* they'll be dealt with before anybody even tries to call this method!
* @return false.
*/
@Override
public synchronized boolean seekToNewSource(long targetPos) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.opencurve.curve.fs.libfs.CurveFSStat;
import io.opencurve.curve.fs.libfs.CurveFSStatVFS;

import java.util.UUID;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,6 +38,7 @@
class CurveFSTalker extends CurveFSProto {
private CurveFSMount mount;
private String fsname = null;
private String mountpoint = null;
private boolean inited = false;

private static final String PREFIX_KEY = "curvefs";
Expand Down Expand Up @@ -72,14 +74,15 @@ void initialize(URI uri, Configuration conf) throws IOException {
if (null == fsname || fsname.isEmpty()) {
throw new IOException("curvefs.name is not set");
}
mount.mount(fsname, "/");
mountpoint = UUID.randomUUID().toString();
mount.mount(fsname, mountpoint);
inited = true;
}

@Override
void shutdown() throws IOException {
if (inited) {
mount.umount(fsname, "/");
mount.umount(fsname, mountpoint);
mount = null;
inited = false;
}
Expand Down Expand Up @@ -179,4 +182,4 @@ void chown(Path path, int uid, int gid) throws IOException {
void rename(Path src, Path dst) throws IOException {
mount.rename(tostr(src), tostr(dst));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ private Path makeAbsolute(Path path) {
return new Path(workingDir, path);
}

@Override
public URI getUri() {
return uri;
}

@Override
public String getScheme() {
return uri.getScheme();
return "hdfs";
}

@Override
Expand All @@ -85,14 +87,12 @@ public void initialize(URI uri, Configuration conf) throws IOException {
this.workingDir = getHomeDirectory();
}


@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
path = makeAbsolute(path);

// throws filenotfoundexception if path is a directory
int fd = curve.open(path, CurveFSMount.O_RDONLY, 0);

/* get file size */
CurveFSStat stat = new CurveFSStat();
curve.fstat(fd, stat);

Expand All @@ -102,10 +102,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {

@Override
public void close() throws IOException {
super.close(); // this method does stuff, make sure it's run!
super.close();
curve.shutdown();
}

@Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable progress) throws IOException {
path = makeAbsolute(path);

Expand All @@ -122,6 +123,7 @@ public FSDataOutputStream append(Path path, int bufferSize, Progressable progres
return new FSDataOutputStream(ostream, statistics);
}

@Override
public Path getWorkingDirectory() {
return workingDir;
}
Expand All @@ -144,6 +146,7 @@ public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, perms);
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
path = makeAbsolute(path);

Expand All @@ -160,7 +163,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
return status;
}


@Override
public FileStatus[] listStatus(Path path) throws IOException {
path = makeAbsolute(path);

Expand All @@ -174,12 +177,10 @@ public FileStatus[] listStatus(Path path) throws IOException {
for (int i = 0; i < status.length; i++) {
status[i] = getFileStatus(new Path(path, dirlist[i]));
}
curve.shutdown();
return status;
} else {
throw new FileNotFoundException("File " + path + " does not exist.");
}

}

@Override
Expand Down Expand Up @@ -208,9 +209,9 @@ public void setTimes(Path path, long mtime, long atime) throws IOException {
curve.setattr(path, stat, mask);
}

@Override
public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {

path = makeAbsolute(path);

boolean exists = exists(path);
Expand Down Expand Up @@ -268,6 +269,7 @@ public void setOwner(Path path, String username, String groupname) throws IOExce
}

@Deprecated
@Override
public FSDataOutputStream createNonRecursive(Path path, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Expand All @@ -278,7 +280,7 @@ public FSDataOutputStream createNonRecursive(Path path, FsPermission permission,

if (parent != null) {
CurveFSStat stat = new CurveFSStat();
curve.lstat(parent, stat); // handles FileNotFoundException case
curve.lstat(parent, stat);
if (stat.isFile()) {
throw new FileAlreadyExistsException(parent.toString());
}
Expand Down Expand Up @@ -314,28 +316,27 @@ public boolean rename(Path src, Path dst) throws IOException {
}

@Deprecated
@Override
public boolean delete(Path path) throws IOException {
return delete(path, false);
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
path = makeAbsolute(path);

/* path exists? */
FileStatus status;
try {
status = getFileStatus(path);
} catch (FileNotFoundException e) {
return false;
}

/* we're done if its a file */
if (status.isFile()) {
curve.unlink(path);
return true;
}

/* get directory contents */
FileStatus[] dirlist = listStatus(path);
if (dirlist == null) {
return false;
Expand Down Expand Up @@ -383,6 +384,6 @@ protected int getDefaultPort() {

@Override
public String getCanonicalServiceName() {
return null; // Does not support Token
return null;
}
}

This file was deleted.

4 changes: 2 additions & 2 deletions curvefs/sdk/libcurvefs/libcurvefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ int curvefs_open(uintptr_t instance_ptr,
}
}

uint64_t fd;
uint64_t fd = 0;
rc = mount->vfs->Open(path, flags, mode, &fd);
if (rc != CURVEFS_ERROR::OK) {
return SysErr(rc);
Expand All @@ -164,7 +164,7 @@ ssize_t curvefs_read(uintptr_t instance_ptr,
int fd,
char* buffer,
size_t count) {
size_t nread;
size_t nread = 0;
auto mount = get_instance(instance_ptr);
auto rc = mount->vfs->Read(fd, buffer, count, &nread);
if (rc != CURVEFS_ERROR::OK) {
Expand Down

0 comments on commit 7904502

Please sign in to comment.