Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support iceberg connection #20189

Merged
merged 6 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_explain_for_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml
poetry run python main.py -t ./test_case/iceberg_connection.toml

echo "--- Running benchmarks"
poetry run python main.py -t ./benches/predicate_pushdown.toml
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "e2e java-binding test (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ steps:
<<: *docker-compose
run: sink-test-env
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 17
timeout_in_minutes: 19
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
Expand Down
67 changes: 67 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_connection.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE CONNECTION CONN WITH (
type = 'iceberg',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://hummock001/iceberg-data',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

statement ok
CREATE SINK sink1 from s1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_connection_table',
connection = conn,
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 1,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3) values(1,'1','1'),(2,'2','2'),(3,'3','3'),(4,'4','4'),(5,'5','5');

statement ok
flush

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
connection = conn,
database.name = 'demo_db',
table.name = 'test_connection_table',
);

sleep 2s

query I
select * from iceberg_t1_source order by i1 limit 5;
----
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;

statement ok
DROP CONNECTION conn;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_connection.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_connection_table',
]

slt = 'test_case/iceberg_connection.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_connection_table',
'DROP SCHEMA IF EXISTS demo_db',
]
173 changes: 169 additions & 4 deletions src/connector/src/connector_common/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use anyhow::Context;
use opendal::services::{Gcs, S3};
use opendal::Operator;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::ClientConfig;
use risingwave_common::bail;
use risingwave_common::secret::LocalSecretManager;
use risingwave_pb::catalog::PbConnection;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tonic::async_trait;
use url::Url;
use with_options::WithOptions;

use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
use crate::connector_common::{
AwsAuthProps, IcebergCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
};
use crate::error::ConnectorResult;
use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient;
use crate::source::kafka::{KafkaContextCommon, RwConsumerContext};
use crate::{dispatch_connection_impl, ConnectionImpl};
use crate::{deserialize_optional_bool_from_string, dispatch_connection_impl, ConnectionImpl};

pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry";

Expand Down Expand Up @@ -109,12 +117,169 @@ impl KafkaConnection {
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
pub struct IcebergConnection {}
pub struct IcebergConnection {
#[serde(rename = "catalog.type")]
pub catalog_type: Option<String>,
#[serde(rename = "s3.region")]
pub region: Option<String>,
#[serde(rename = "s3.endpoint")]
pub endpoint: Option<String>,
#[serde(rename = "s3.access.key")]
pub access_key: Option<String>,
#[serde(rename = "s3.secret.key")]
pub secret_key: Option<String>,

#[serde(rename = "gcs.credential")]
pub gcs_credential: Option<String>,

/// Path of iceberg warehouse, only applicable in storage catalog.
#[serde(rename = "warehouse.path")]
pub warehouse_path: Option<String>,
/// Catalog name, can be omitted for storage catalog, but
/// must be set for other catalogs.
#[serde(rename = "catalog.name")]
pub catalog_name: Option<String>,
/// URI of iceberg catalog, only applicable in rest catalog.
#[serde(rename = "catalog.uri")]
pub catalog_uri: Option<String>,
/// Credential for accessing iceberg catalog, only applicable in rest catalog.
/// A credential to exchange for a token in the OAuth2 client credentials flow.
#[serde(rename = "catalog.credential")]
pub credential: Option<String>,
/// token for accessing iceberg catalog, only applicable in rest catalog.
/// A Bearer token which will be used for interaction with the server.
#[serde(rename = "catalog.token")]
pub token: Option<String>,
/// `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
/// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
#[serde(rename = "catalog.oauth2-server-uri")]
pub oauth2_server_uri: Option<String>,
/// scope for accessing iceberg catalog, only applicable in rest catalog.
/// Additional scope for OAuth2.
#[serde(rename = "catalog.scope")]
pub scope: Option<String>,

#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: Option<bool>,

#[serde(rename = "catalog.jdbc.user")]
pub jdbc_user: Option<String>,

#[serde(rename = "catalog.jdbc.password")]
pub jdbc_password: Option<String>,
}

#[async_trait]
impl Connection for IcebergConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
let info = match &self.warehouse_path {
Some(warehouse_path) => {
let url = Url::parse(warehouse_path);
if url.is_err()
&& let Some(catalog_type) = &self.catalog_type
&& catalog_type == "rest"
{
// If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog,
// so we allow it to pass here.
None
} else {
let url =
url.with_context(|| format!("Invalid warehouse path: {}", warehouse_path))?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_owned();
let root = url.path().trim_start_matches('/').to_owned();
Some((url.scheme().to_owned(), bucket, root))
}
}
None => {
if let Some(catalog_type) = &self.catalog_type
&& catalog_type == "rest"
{
None
} else {
bail!("`warehouse.path` must be set");
}
}
};

// test storage
if let Some((scheme, bucket, root)) = info {
match scheme.as_str() {
"s3" | "s3a" => {
let mut builder = S3::default();
if let Some(region) = &self.region {
builder = builder.region(region);
}
if let Some(endpoint) = &self.endpoint {
builder = builder.endpoint(endpoint);
}
if let Some(access_key) = &self.access_key {
builder = builder.access_key_id(access_key);
}
if let Some(secret_key) = &self.secret_key {
builder = builder.secret_access_key(secret_key);
}
builder = builder.root(root.as_str()).bucket(bucket.as_str());
let op = Operator::new(builder)?.finish();
op.check().await?;
}
"gs" | "gcs" => {
let mut builder = Gcs::default();
if let Some(credential) = &self.gcs_credential {
builder = builder.credential(credential);
}
builder = builder.root(root.as_str()).bucket(bucket.as_str());
let op = Operator::new(builder)?.finish();
op.check().await?;
}
_ => {
bail!("Unsupported scheme: {}", scheme);
}
}
}

// test catalog
let iceberg_common = IcebergCommon {
catalog_type: self.catalog_type.clone(),
region: self.region.clone(),
endpoint: self.endpoint.clone(),
access_key: self.access_key.clone(),
secret_key: self.secret_key.clone(),
gcs_credential: self.gcs_credential.clone(),
warehouse_path: self.warehouse_path.clone(),
catalog_name: self.catalog_name.clone(),
catalog_uri: self.catalog_uri.clone(),
credential: self.credential.clone(),
token: self.token.clone(),
oauth2_server_uri: self.oauth2_server_uri.clone(),
scope: self.scope.clone(),
path_style_access: self.path_style_access,
database_name: Some("test_database".to_owned()),
table_name: "test_table".to_owned(),
enable_config_load: Some(false),
};

let mut java_map = HashMap::new();
if let Some(jdbc_user) = &self.jdbc_user {
java_map.insert("jdbc.user".to_owned(), jdbc_user.to_owned());
}
if let Some(jdbc_password) = &self.jdbc_password {
java_map.insert("jdbc.password".to_owned(), jdbc_password.to_owned());
}
let catalog = iceberg_common.create_catalog(&java_map).await?;
// test catalog by `table_exists` api
catalog
.table_exists(&iceberg_common.full_table_name()?)
.await?;
Ok(())
}
}

Expand Down
Loading