Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ public static String getTempWritePath(String loc, String prefix) {
}

public TFileType getTFileTypeForBE() {
if (StringUtils.isNotBlank(normalizedLocation) && isHdfsOnOssEndpoint(normalizedLocation)) {
return TFileType.FILE_HDFS;
}
return SchemaTypeMapper.fromSchemaToFileType(schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,22 @@
public class OSSHdfsProperties extends HdfsCompatibleProperties {

@Setter
@ConnectorProperty(names = {"oss.hdfs.endpoint",
"dlf.endpoint", "dlf.catalog.endpoint", "oss.endpoint"},
@ConnectorProperty(names = {"oss.hdfs.endpoint", "oss.endpoint",
"dlf.endpoint", "dlf.catalog.endpoint"},
description = "The endpoint of OSS.")
protected String endpoint = "";

@ConnectorProperty(names = {"oss.hdfs.access_key", "dlf.access_key", "dlf.catalog.accessKeyId", "oss.access_key"},
@ConnectorProperty(names = {"oss.hdfs.access_key", "oss.access_key", "dlf.access_key", "dlf.catalog.accessKeyId"},
sensitive = true,
description = "The access key of OSS.")
protected String accessKey = "";

@ConnectorProperty(names = {"oss.hdfs.secret_key", "dlf.secret_key", "dlf.catalog.secret_key", "oss.secret_key"},
@ConnectorProperty(names = {"oss.hdfs.secret_key", "oss.secret_key", "dlf.secret_key", "dlf.catalog.secret_key"},
sensitive = true,
description = "The secret key of OSS.")
protected String secretKey = "";

@ConnectorProperty(names = {"oss.hdfs.region", "dlf.region", "oss.region"},
@ConnectorProperty(names = {"oss.hdfs.region", "oss.region", "dlf.region"},
required = false,
description = "The region of OSS.")
protected String region;
Expand All @@ -85,8 +86,8 @@ public class OSSHdfsProperties extends HdfsCompatibleProperties {
description = "The security token of OSS.")
protected String securityToken = "";

private static final Set<String> OSS_ENDPOINT_KEY_NAME = ImmutableSet.of("oss.hdfs.endpoint",
"dlf.endpoint", "dlf.catalog.endpoint", "oss.endpoint");
private static final Set<String> OSS_ENDPOINT_KEY_NAME = ImmutableSet.of("oss.hdfs.endpoint", "oss.endpoint",
"dlf.endpoint", "dlf.catalog.endpoint");

private Map<String, String> backendConfigProperties;

Expand All @@ -110,21 +111,15 @@ public static boolean guessIsMe(Map<String, String> props) {
}
String endpoint = OSS_ENDPOINT_KEY_NAME.stream()
.map(props::get)
.filter(StringUtils::isNotBlank)
.filter(ep -> StringUtils.isNotBlank(ep) && ep.endsWith(OSS_HDFS_ENDPOINT_SUFFIX))
.findFirst()
.orElse(null);
if (StringUtils.isBlank(endpoint)) {
return false;
}
return endpoint.endsWith(OSS_HDFS_ENDPOINT_SUFFIX) || endpoint.contains(DLF_ENDPOINT_KEY_WORDS);
return StringUtils.isNotBlank(endpoint);
}

@Override
protected void checkRequiredProperties() {
super.checkRequiredProperties();
if (!isValidEndpoint(endpoint)) {
throw new IllegalArgumentException("Property oss.endpoint is required and must be a valid OSS endpoint.");
}
}

private void convertDlfToOssEndpointIfNeeded() {
Expand All @@ -144,22 +139,9 @@ public static Optional<String> extractRegion(String endpoint) {
return Optional.empty();
}

public static boolean isValidEndpoint(String endpoint) {
for (Pattern pattern : ENDPOINT_PATTERN) {
if (pattern.matcher(endpoint.toLowerCase()).matches()) {
return true;
}
}
return false;
}

@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
if (!isValidEndpoint(endpoint.toLowerCase())) {
throw new IllegalArgumentException("The endpoint is not a valid OSS HDFS endpoint: " + endpoint
+ ". It should match the pattern: <region>.oss-dls.aliyuncs.com");
}
// Extract region from the endpoint, e.g., "cn-shanghai.oss-dls.aliyuncs.com" -> "cn-shanghai"
if (StringUtils.isBlank(this.region)) {
Optional<String> regionOptional = extractRegion(endpoint);
Expand All @@ -178,8 +160,6 @@ public void initNormalizeAndCheckProps() {

private static final String OSS_HDFS_ENDPOINT_SUFFIX = ".oss-dls.aliyuncs.com";

private static final String DLF_ENDPOINT_KEY_WORDS = "dlf";

@Override
public Map<String, String> getBackendConfigProperties() {
return backendConfigProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ public class OSSProperties extends AbstractS3CompatibleProperties {
* - datalake.cn-hangzhou.aliyuncs.com => region = cn-hangzhou
*/
public static final Set<Pattern> ENDPOINT_PATTERN = ImmutableSet.of(STANDARD_ENDPOINT_PATTERN,
Pattern.compile("(?:https?://)?([a-z]{2}-[a-z0-9-]+)\\.oss-dls\\.aliyuncs\\.com"),
Pattern.compile("^(?:https?://)?dlf(?:-vpc)?\\.([a-z0-9-]+)\\.aliyuncs\\.com(?:/.*)?$"),
Pattern.compile("^(?:https?://)?datalake(?:-vpc)?\\.([a-z0-9-]+)\\.aliyuncs\\.com(?:/.*)?$"));

Expand All @@ -155,6 +154,8 @@ public class OSSProperties extends AbstractS3CompatibleProperties {
private static List<String> DLF_TYPE_KEYWORDS = Arrays.asList("hive.metastore.type",
"iceberg.catalog.type", "paimon.catalog.type");

private static final String DLS_URI_KEYWORDS = "oss-dls.aliyuncs";

protected OSSProperties(Map<String, String> origProps) {
super(Type.OSS, origProps);
}
Expand All @@ -175,6 +176,9 @@ protected static boolean guessIsMe(Map<String, String> origProps) {
.findFirst()
.orElse(null);
if (StringUtils.isNotBlank(value)) {
if (value.contains(DLS_URI_KEYWORDS)) {
return false;
}
return (value.contains("aliyuncs.com"));
}

Expand Down Expand Up @@ -203,6 +207,10 @@ private static boolean isKnownObjectStorage(String value) {
if (value == null) {
return false;
}
boolean isDls = value.contains(DLS_URI_KEYWORDS);
if (isDls) {
return false;
}
if (value.startsWith("oss://")) {
return true;
}
Expand All @@ -211,8 +219,7 @@ private static boolean isKnownObjectStorage(String value) {
}
boolean isAliyunOss = (value.contains("oss-"));
boolean isAmazonS3 = value.contains("s3.");
boolean isDls = value.contains("dls");
return isAliyunOss || isAmazonS3 || isDls;
return isAliyunOss || isAmazonS3;
}

private static boolean isDlfMSType(Map<String, String> params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,22 @@ public static StorageProperties createPrimary(Map<String, String> origProps) {
Arrays.asList(
props -> (isFsSupport(props, FS_HDFS_SUPPORT)
|| HdfsProperties.guessIsMe(props)) ? new HdfsProperties(props) : null,
props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
|| isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
|| OSSHdfsProperties.guessIsMe(props)) ? new OSSHdfsProperties(props) : null,
props -> {
// OSS-HDFS and OSS are mutually exclusive - check OSS-HDFS first
if ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
|| isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
|| OSSHdfsProperties.guessIsMe(props)) {
return new OSSHdfsProperties(props);
}
// Only check for regular OSS if OSS-HDFS is not enabled
if (isFsSupport(props, FS_OSS_SUPPORT)
|| OSSProperties.guessIsMe(props)) {
return new OSSProperties(props);
}
return null;
},
props -> (isFsSupport(props, FS_S3_SUPPORT)
|| S3Properties.guessIsMe(props)) ? new S3Properties(props) : null,
props -> (isFsSupport(props, FS_OSS_SUPPORT)
|| OSSProperties.guessIsMe(props)) ? new OSSProperties(props) : null,
props -> (isFsSupport(props, FS_OBS_SUPPORT)
|| OBSProperties.guessIsMe(props)) ? new OBSProperties(props) : null,
props -> (isFsSupport(props, FS_COS_SUPPORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,9 @@ public void testInvalidEndpoint() throws UserException {
origProps.put("oss.endpoint", "cn-north-2-gov-1.oss-dls.aliyuncs.com");
props = (OSSHdfsProperties) StorageProperties.createPrimary(origProps);
Assertions.assertEquals("cn-north-2-gov-1", props.getBackendConfigProperties().get("fs.oss.region"));
origProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
props = (OSSHdfsProperties) StorageProperties.createPrimary(origProps);
Assertions.assertEquals("cn-beijing", props.getBackendConfigProperties().get("fs.oss.region"));
origProps.remove("dlf.endpoint");
props = (OSSHdfsProperties) StorageProperties.createPrimary(origProps);
Assertions.assertEquals("cn-north-2-gov-1", props.getBackendConfigProperties().get("fs.oss.region"));
origProps.put("dlf.endpoint", "dlf-vpc.ap-southeast-5.aliyuncs.com");
props = (OSSHdfsProperties) StorageProperties.createPrimary(origProps);
Assertions.assertEquals("ap-southeast-5", props.getBackendConfigProperties().get("fs.oss.region"));
origProps.put("dlf.endpoint", "dlf.us-east-1.aliyuncs.com");
props = (OSSHdfsProperties) StorageProperties.createAll(origProps).get(0);
Assertions.assertEquals("us-east-1", props.getBackendConfigProperties().get("fs.oss.region"));
}

@Test
Expand Down Expand Up @@ -167,4 +158,62 @@ public void testDefaultFS() {
Assertions.assertEquals("oss://bucket", props.getBackendConfigProperties().get("fs.defaultFS"));
}

@Test
public void testOssAndOssHDFS() throws UserException {
Map<String, String> origProps = new HashMap<>();
origProps.put("oss.endpoint", "cn-shanghai.oss-dls.aliyuncs.com");
origProps.put("oss.access_key", "testAccessKey");
origProps.put("oss.secret_key", "testSecretKey");
origProps.put("oss.hdfs.fs.defaultFS", "oss://my-bucket");
origProps.put("dlf.endpoint", "cn-shanghai.oss-dlf.aliyuncs.com");
origProps.put("dlf.access_key", "testAccessKey");
origProps.put("dlf.secret_key", "testSecretKey");
StorageProperties props = StorageProperties.createPrimary(origProps);
Assertions.assertEquals("OSSHDFS", props.getStorageName());
Assertions.assertEquals("oss://my-bucket", props.getBackendConfigProperties().get("fs.defaultFS"));
Assertions.assertEquals("cn-shanghai", props.getBackendConfigProperties().get("fs.oss.region"));
}

@Test
public void testDlfAndOssHDFS() throws UserException {

Map<String, String> origProps = new HashMap<>();
origProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
origProps.put("dlf.access_key", "testAccessKey");
origProps.put("dlf.secret_key", "testSecretKey");
origProps.put("oss.hdfs.enabled", "true");
StorageProperties props = StorageProperties.createPrimary(origProps);
Assertions.assertEquals("OSSHDFS", props.getStorageName());
}


@Test
public void testOssHDFSNewProps() throws UserException {
Map<String, String> origProps = new HashMap<>();
origProps.put("oss.hdfs.endpoint", "cn-shanghai.oss-dls.aliyuncs.com");
origProps.put("oss.hdfs.access_key", "testAccessKey");
origProps.put("oss.hdfs.secret_key", "testSecretKey");
StorageProperties props = StorageProperties.createPrimary(origProps);
Assertions.assertEquals("OSSHDFS", props.getStorageName());
Assertions.assertEquals("cn-shanghai", props.getBackendConfigProperties().get("fs.oss.region"));
Assertions.assertEquals("com.aliyun.jindodata.oss.JindoOssFileSystem",
props.getBackendConfigProperties().get("fs.oss.impl"));
origProps.put("oss.endpoint", "cn-shanghai.oss.aliyuncs.com");
origProps.put("oss.access_key", "testAccessKey");
origProps.put("oss.secret_key", "testSecretKey");
props = StorageProperties.createPrimary(origProps);
Assertions.assertEquals("OSSHDFS", props.getStorageName());
Assertions.assertEquals("cn-shanghai", props.getBackendConfigProperties().get("fs.oss.region"));
Assertions.assertEquals("com.aliyun.jindodata.oss.JindoOssFileSystem",
props.getBackendConfigProperties().get("fs.oss.impl"));
origProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
origProps.put("dlf.access_key", "testAccessKey");
origProps.put("dlf.secret_key", "testSecretKey");
props = StorageProperties.createPrimary(origProps);
Assertions.assertEquals("OSSHDFS", props.getStorageName());
Assertions.assertEquals("cn-shanghai", props.getBackendConfigProperties().get("fs.oss.region"));
Assertions.assertEquals("com.aliyun.jindodata.oss.JindoOssFileSystem",
props.getBackendConfigProperties().get("fs.oss.impl"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testGetRegion() throws UserException {
origProps.put("oss.endpoint", "http://s3.oss-cn-hongkong.aliyuncs.com");
Assertions.assertEquals("cn-hongkong", ((OSSProperties) StorageProperties.createPrimary(origProps)).getRegion());
origProps.put("oss.endpoint", "https://dlf.cn-beijing.aliyuncs.com");
Assertions.assertEquals("cn-beijing", ((OSSProperties) StorageProperties.createAll(origProps).get(1)).getRegion());
Assertions.assertEquals("cn-beijing", ((OSSProperties) StorageProperties.createAll(origProps).get(0)).getRegion());
origProps.put("oss.endpoint", "datalake-vpc.cn-shenzhen.aliyuncs.com");
Assertions.assertEquals("cn-shenzhen", ((OSSProperties) StorageProperties.createPrimary(origProps)).getRegion());
origProps.put("oss.endpoint", "https://datalake-vpc.cn-shenzhen.aliyuncs.com");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ catalog_tvf_test_dlf hms dlf.catalog.id 987654321
catalog_tvf_test_dlf hms dlf.secret_key *XXX

-- !test_12 --
catalog_tvf_test_dlf hms dlf.access_key AAAAAAAAAAAAAAAAAAAAAA
catalog_tvf_test_dlf hms dlf.access_key *XXX

-- !test_13 --
catalog_tvf_test_dlf hms dlf.uid 123456789
Expand All @@ -32,7 +32,7 @@ internal internal NULL NULL
catalog_tvf_test_dlf hms dlf.secret_key *XXX

-- !test_18 --
catalog_tvf_test_dlf hms dlf.access_key AAAAAAAAAAAAAAAAAAAAAA
catalog_tvf_test_dlf hms dlf.access_key *XXX

-- !test_19 --
catalog_tvf_test_dlf hms dlf.uid 123456789
Expand Down
Loading