From 002bf5b3d8459825f52a0a09cf801f9fcc96bbd2 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 Jan 2025 17:37:14 +0800 Subject: [PATCH 1/5] support iceberg connection --- .../src/connector_common/connection.rs | 173 +++++++++++++++++- 1 file changed, 169 insertions(+), 4 deletions(-) diff --git a/src/connector/src/connector_common/connection.rs b/src/connector/src/connector_common/connection.rs index 9be0c7ac84fe2..e953e3a12606b 100644 --- a/src/connector/src/connector_common/connection.rs +++ b/src/connector/src/connector_common/connection.rs @@ -12,22 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::Duration; +use anyhow::Context; +use opendal::services::{Gcs, S3}; +use opendal::Operator; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::ClientConfig; +use risingwave_common::bail; use risingwave_common::secret::LocalSecretManager; use risingwave_pb::catalog::PbConnection; use serde_derive::Deserialize; use serde_with::serde_as; use tonic::async_trait; +use url::Url; use with_options::WithOptions; -use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon}; +use crate::connector_common::{ + AwsAuthProps, IcebergCommon, KafkaConnectionProps, KafkaPrivateLinkCommon, +}; use crate::error::ConnectorResult; use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient; use crate::source::kafka::{KafkaContextCommon, RwConsumerContext}; -use crate::{dispatch_connection_impl, ConnectionImpl}; +use crate::{deserialize_optional_bool_from_string, dispatch_connection_impl, ConnectionImpl}; pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry"; @@ -109,12 +117,169 @@ impl KafkaConnection { #[serde_as] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] #[serde(deny_unknown_fields)] -pub struct IcebergConnection {} +pub struct IcebergConnection { + #[serde(rename = "catalog.type")] + pub catalog_type: Option, + #[serde(rename = "s3.region")] + pub region: Option, + #[serde(rename = "s3.endpoint")] + pub endpoint: Option, + #[serde(rename = "s3.access.key")] + pub access_key: Option, + #[serde(rename = "s3.secret.key")] + pub secret_key: Option, + + #[serde(rename = "gcs.credential")] + pub gcs_credential: Option, + + /// Path of iceberg warehouse, only applicable in storage catalog. + #[serde(rename = "warehouse.path")] + pub warehouse_path: Option, + /// Catalog name, can be omitted for storage catalog, but + /// must be set for other catalogs. + #[serde(rename = "catalog.name")] + pub catalog_name: Option, + /// URI of iceberg catalog, only applicable in rest catalog. + #[serde(rename = "catalog.uri")] + pub catalog_uri: Option, + /// Credential for accessing iceberg catalog, only applicable in rest catalog. + /// A credential to exchange for a token in the OAuth2 client credentials flow. + #[serde(rename = "catalog.credential")] + pub credential: Option, + /// token for accessing iceberg catalog, only applicable in rest catalog. + /// A Bearer token which will be used for interaction with the server. + #[serde(rename = "catalog.token")] + pub token: Option, + /// `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog. + /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server. + #[serde(rename = "catalog.oauth2-server-uri")] + pub oauth2_server_uri: Option, + /// scope for accessing iceberg catalog, only applicable in rest catalog. + /// Additional scope for OAuth2. + #[serde(rename = "catalog.scope")] + pub scope: Option, + + #[serde( + rename = "s3.path.style.access", + default, + deserialize_with = "deserialize_optional_bool_from_string" + )] + pub path_style_access: Option, + + #[serde(rename = "catalog.jdbc.user")] + pub jdbc_user: Option, + + #[serde(rename = "catalog.jdbc.password")] + pub jdbc_password: Option, +} #[async_trait] impl Connection for IcebergConnection { async fn test_connection(&self) -> ConnectorResult<()> { - todo!() + let info = match &self.warehouse_path { + Some(warehouse_path) => { + let url = Url::parse(warehouse_path); + if url.is_err() + && let Some(catalog_type) = &self.catalog_type + && catalog_type == "rest" + { + // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog, + // so we allow it to pass here. + None + } else { + let url = + url.with_context(|| format!("Invalid warehouse path: {}", warehouse_path))?; + let bucket = url + .host_str() + .with_context(|| { + format!("Invalid s3 path: {}, bucket is missing", warehouse_path) + })? + .to_owned(); + let root = url.path().trim_start_matches('/').to_owned(); + Some((url.scheme().to_owned(), bucket, root)) + } + } + None => { + if let Some(catalog_type) = &self.catalog_type + && catalog_type == "rest" + { + None + } else { + bail!("`warehouse.path` must be set"); + } + } + }; + + // test storage + if let Some((scheme, bucket, root)) = info { + match scheme.as_str() { + "s3" | "s3a" => { + let mut builder = S3::default(); + if let Some(region) = &self.region { + builder = builder.region(region); + } + if let Some(endpoint) = &self.endpoint { + builder = builder.endpoint(endpoint); + } + if let Some(access_key) = &self.access_key { + builder = builder.access_key_id(access_key); + } + if let Some(secret_key) = &self.secret_key { + builder = builder.secret_access_key(secret_key); + } + builder = builder.root(root.as_str()).bucket(bucket.as_str()); + let op = Operator::new(builder)?.finish(); + op.check().await?; + } + "gs" | "gcs" => { + let mut builder = Gcs::default(); + if let Some(credential) = &self.gcs_credential { + builder = builder.credential(credential); + } + builder = builder.root(root.as_str()).bucket(bucket.as_str()); + let op = Operator::new(builder)?.finish(); + op.check().await?; + } + _ => { + bail!("Unsupported scheme: {}", scheme); + } + } + } + + // test catalog + let iceberg_common = IcebergCommon { + catalog_type: self.catalog_type.clone(), + region: self.region.clone(), + endpoint: self.endpoint.clone(), + access_key: self.access_key.clone(), + secret_key: self.secret_key.clone(), + gcs_credential: self.gcs_credential.clone(), + warehouse_path: self.warehouse_path.clone(), + catalog_name: self.catalog_name.clone(), + catalog_uri: self.catalog_uri.clone(), + credential: self.credential.clone(), + token: self.token.clone(), + oauth2_server_uri: self.oauth2_server_uri.clone(), + scope: self.scope.clone(), + path_style_access: self.path_style_access.clone(), + database_name: Some("test_database".to_owned()), + table_name: "table_name".to_owned(), + enable_config_load: Some(false), + }; + + let mut java_map = HashMap::new(); + if let Some(jdbc_user) = &self.jdbc_user { + java_map.insert("jdbc.user".to_owned(), jdbc_user.to_owned()); + } + if let Some(jdbc_password) = &self.jdbc_password { + java_map.insert("jdbc.password".to_owned(), jdbc_password.to_owned()); + } + let catalog = iceberg_common.create_catalog(&java_map).await?; + // test catalog by `table_exists` api + catalog + .table_exists(&iceberg_common.full_table_name()?) + .await?; + Ok(()) } } From 39365e7f792336ab8986fc7cff6b83a05e1faebf Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 Jan 2025 18:10:20 +0800 Subject: [PATCH 2/5] support iceberg connection --- ci/scripts/e2e-iceberg-sink-v2-test.sh | 1 + .../iceberg/test_case/iceberg_connection.slt | 67 +++++++++++++++++++ .../iceberg/test_case/iceberg_connection.toml | 12 ++++ 3 files changed, 80 insertions(+) create mode 100644 e2e_test/iceberg/test_case/iceberg_connection.slt create mode 100644 e2e_test/iceberg/test_case/iceberg_connection.toml diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index 4825e4ab11b1a..30f48db61b5b3 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -52,6 +52,7 @@ poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml poetry run python main.py -t ./test_case/iceberg_source_explain_for_delete.toml poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml +poetry run python main.py -t ./test_case/iceberg_connection.toml echo "--- Running benchmarks" poetry run python main.py -t ./benches/predicate_pushdown.toml diff --git a/e2e_test/iceberg/test_case/iceberg_connection.slt b/e2e_test/iceberg/test_case/iceberg_connection.slt new file mode 100644 index 0000000000000..abe3236e79af1 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_connection.slt @@ -0,0 +1,67 @@ +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE CONNECTION CONN WITH ( + type = 'iceberg', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://hummock001/iceberg-data', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' +); + +statement ok +CREATE SINK sink1 from s1 WITH ( + connector = 'iceberg', + type = 'upsert', + database.name = 'demo_db', + table.name = 'test_connection_table', + connection = conn, + create_table_if_not_exists = 'true', + commit_checkpoint_interval = 1, + primary_key = 'i1,i2', +); + +statement ok +INSERT INTO s1 (i1, i2, i3) values(1,'1','1'),(2,'2','2'),(3,'3','3'),(4,'4','4'),(5,'5','5'); + +statement ok +flush + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + connection = conn, + database.name = 'demo_db', + table.name = 'test_connection_table', +); + +sleep 2s + +query I +select * from iceberg_t1_source order by i1 limit 5; +---- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; + +statement ok +DROP CONNECTION conn; diff --git a/e2e_test/iceberg/test_case/iceberg_connection.toml b/e2e_test/iceberg/test_case/iceberg_connection.toml new file mode 100644 index 0000000000000..a7453267f7dd4 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_connection.toml @@ -0,0 +1,12 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.test_connection_table', +] + +slt = 'test_case/iceberg_connection.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.test_connection_table', + 'DROP SCHEMA IF EXISTS demo_db', +] +iceberg_connection \ No newline at end of file From f346ed53db8dafb095ef335c28860d5e1489b1e1 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 Jan 2025 22:46:44 +0800 Subject: [PATCH 3/5] fmt --- src/connector/src/connector_common/connection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/connector_common/connection.rs b/src/connector/src/connector_common/connection.rs index e953e3a12606b..75a808a6f2eb9 100644 --- a/src/connector/src/connector_common/connection.rs +++ b/src/connector/src/connector_common/connection.rs @@ -261,9 +261,9 @@ impl Connection for IcebergConnection { token: self.token.clone(), oauth2_server_uri: self.oauth2_server_uri.clone(), scope: self.scope.clone(), - path_style_access: self.path_style_access.clone(), + path_style_access: self.path_style_access, database_name: Some("test_database".to_owned()), - table_name: "table_name".to_owned(), + table_name: "test_table".to_owned(), enable_config_load: Some(false), }; From f21959857951ddaf0a0b3ab2ddcd94890735c17e Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 17 Jan 2025 13:06:28 +0800 Subject: [PATCH 4/5] fmt --- e2e_test/iceberg/test_case/iceberg_connection.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/e2e_test/iceberg/test_case/iceberg_connection.toml b/e2e_test/iceberg/test_case/iceberg_connection.toml index a7453267f7dd4..3c64c0a067a44 100644 --- a/e2e_test/iceberg/test_case/iceberg_connection.toml +++ b/e2e_test/iceberg/test_case/iceberg_connection.toml @@ -9,4 +9,3 @@ drop_sqls = [ 'DROP TABLE IF EXISTS demo_db.test_connection_table', 'DROP SCHEMA IF EXISTS demo_db', ] -iceberg_connection \ No newline at end of file From 7a0cd5aa6d6acb885ed3682486226c8ae83410e6 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 17 Jan 2025 14:04:22 +0800 Subject: [PATCH 5/5] increase iceberg sink timeout test --- ci/workflows/main-cron.yml | 2 +- ci/workflows/pull-request.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 19beb4e47dfbe..f2d65351c0d96 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -440,7 +440,7 @@ steps: plugins: - docker-compose#v5.5.0: *docker-compose - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 17 retry: *auto-retry - label: "e2e java-binding test (release)" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 42a70a3a4dedf..19fda2f2d99d5 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -263,7 +263,7 @@ steps: <<: *docker-compose run: sink-test-env - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 17 + timeout_in_minutes: 19 retry: *auto-retry - label: "end-to-end iceberg cdc test"