Skip to content

Commit

Permalink
feat(iceberg): support iceberg connection (#20189)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jan 18, 2025
1 parent 2d31661 commit 47e36be
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 6 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_explain_for_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml
poetry run python main.py -t ./test_case/iceberg_connection.toml

echo "--- Running benchmarks"
poetry run python main.py -t ./benches/predicate_pushdown.toml
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "e2e java-binding test (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ steps:
<<: *docker-compose
run: sink-test-env
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 17
timeout_in_minutes: 19
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
Expand Down
67 changes: 67 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_connection.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE CONNECTION CONN WITH (
type = 'iceberg',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://hummock001/iceberg-data',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

statement ok
CREATE SINK sink1 from s1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_connection_table',
connection = conn,
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 1,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3) values(1,'1','1'),(2,'2','2'),(3,'3','3'),(4,'4','4'),(5,'5','5');

statement ok
flush

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
connection = conn,
database.name = 'demo_db',
table.name = 'test_connection_table',
);

sleep 2s

query I
select * from iceberg_t1_source order by i1 limit 5;
----
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;

statement ok
DROP CONNECTION conn;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_connection.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_connection_table',
]

slt = 'test_case/iceberg_connection.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_connection_table',
'DROP SCHEMA IF EXISTS demo_db',
]
173 changes: 169 additions & 4 deletions src/connector/src/connector_common/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use anyhow::Context;
use opendal::services::{Gcs, S3};
use opendal::Operator;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::ClientConfig;
use risingwave_common::bail;
use risingwave_common::secret::LocalSecretManager;
use risingwave_pb::catalog::PbConnection;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tonic::async_trait;
use url::Url;
use with_options::WithOptions;

use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
use crate::connector_common::{
AwsAuthProps, IcebergCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
};
use crate::error::ConnectorResult;
use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient;
use crate::source::kafka::{KafkaContextCommon, RwConsumerContext};
use crate::{dispatch_connection_impl, ConnectionImpl};
use crate::{deserialize_optional_bool_from_string, dispatch_connection_impl, ConnectionImpl};

pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry";

Expand Down Expand Up @@ -109,12 +117,169 @@ impl KafkaConnection {
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
pub struct IcebergConnection {}
pub struct IcebergConnection {
#[serde(rename = "catalog.type")]
pub catalog_type: Option<String>,
#[serde(rename = "s3.region")]
pub region: Option<String>,
#[serde(rename = "s3.endpoint")]
pub endpoint: Option<String>,
#[serde(rename = "s3.access.key")]
pub access_key: Option<String>,
#[serde(rename = "s3.secret.key")]
pub secret_key: Option<String>,

#[serde(rename = "gcs.credential")]
pub gcs_credential: Option<String>,

/// Path of iceberg warehouse, only applicable in storage catalog.
#[serde(rename = "warehouse.path")]
pub warehouse_path: Option<String>,
/// Catalog name, can be omitted for storage catalog, but
/// must be set for other catalogs.
#[serde(rename = "catalog.name")]
pub catalog_name: Option<String>,
/// URI of iceberg catalog, only applicable in rest catalog.
#[serde(rename = "catalog.uri")]
pub catalog_uri: Option<String>,
/// Credential for accessing iceberg catalog, only applicable in rest catalog.
/// A credential to exchange for a token in the OAuth2 client credentials flow.
#[serde(rename = "catalog.credential")]
pub credential: Option<String>,
/// token for accessing iceberg catalog, only applicable in rest catalog.
/// A Bearer token which will be used for interaction with the server.
#[serde(rename = "catalog.token")]
pub token: Option<String>,
/// `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
/// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
#[serde(rename = "catalog.oauth2-server-uri")]
pub oauth2_server_uri: Option<String>,
/// scope for accessing iceberg catalog, only applicable in rest catalog.
/// Additional scope for OAuth2.
#[serde(rename = "catalog.scope")]
pub scope: Option<String>,

#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: Option<bool>,

#[serde(rename = "catalog.jdbc.user")]
pub jdbc_user: Option<String>,

#[serde(rename = "catalog.jdbc.password")]
pub jdbc_password: Option<String>,
}

#[async_trait]
impl Connection for IcebergConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
let info = match &self.warehouse_path {
Some(warehouse_path) => {
let url = Url::parse(warehouse_path);
if url.is_err()
&& let Some(catalog_type) = &self.catalog_type
&& catalog_type == "rest"
{
// If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog,
// so we allow it to pass here.
None
} else {
let url =
url.with_context(|| format!("Invalid warehouse path: {}", warehouse_path))?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_owned();
let root = url.path().trim_start_matches('/').to_owned();
Some((url.scheme().to_owned(), bucket, root))
}
}
None => {
if let Some(catalog_type) = &self.catalog_type
&& catalog_type == "rest"
{
None
} else {
bail!("`warehouse.path` must be set");
}
}
};

// test storage
if let Some((scheme, bucket, root)) = info {
match scheme.as_str() {
"s3" | "s3a" => {
let mut builder = S3::default();
if let Some(region) = &self.region {
builder = builder.region(region);
}
if let Some(endpoint) = &self.endpoint {
builder = builder.endpoint(endpoint);
}
if let Some(access_key) = &self.access_key {
builder = builder.access_key_id(access_key);
}
if let Some(secret_key) = &self.secret_key {
builder = builder.secret_access_key(secret_key);
}
builder = builder.root(root.as_str()).bucket(bucket.as_str());
let op = Operator::new(builder)?.finish();
op.check().await?;
}
"gs" | "gcs" => {
let mut builder = Gcs::default();
if let Some(credential) = &self.gcs_credential {
builder = builder.credential(credential);
}
builder = builder.root(root.as_str()).bucket(bucket.as_str());
let op = Operator::new(builder)?.finish();
op.check().await?;
}
_ => {
bail!("Unsupported scheme: {}", scheme);
}
}
}

// test catalog
let iceberg_common = IcebergCommon {
catalog_type: self.catalog_type.clone(),
region: self.region.clone(),
endpoint: self.endpoint.clone(),
access_key: self.access_key.clone(),
secret_key: self.secret_key.clone(),
gcs_credential: self.gcs_credential.clone(),
warehouse_path: self.warehouse_path.clone(),
catalog_name: self.catalog_name.clone(),
catalog_uri: self.catalog_uri.clone(),
credential: self.credential.clone(),
token: self.token.clone(),
oauth2_server_uri: self.oauth2_server_uri.clone(),
scope: self.scope.clone(),
path_style_access: self.path_style_access,
database_name: Some("test_database".to_owned()),
table_name: "test_table".to_owned(),
enable_config_load: Some(false),
};

let mut java_map = HashMap::new();
if let Some(jdbc_user) = &self.jdbc_user {
java_map.insert("jdbc.user".to_owned(), jdbc_user.to_owned());
}
if let Some(jdbc_password) = &self.jdbc_password {
java_map.insert("jdbc.password".to_owned(), jdbc_password.to_owned());
}
let catalog = iceberg_common.create_catalog(&java_map).await?;
// test catalog by `table_exists` api
catalog
.table_exists(&iceberg_common.full_table_name()?)
.await?;
Ok(())
}
}

Expand Down

0 comments on commit 47e36be

Please sign in to comment.