Skip to content

Commit

Permalink
feat: Add pulsar iceberg table reader (risingwavelabs#12735)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Oct 26, 2023
1 parent efdf3c9 commit e74b32d
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 10 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "16dab0e36ab337e58ee8002d828def2d212fa116" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "186fde7663545d1d6a5856ce9fbbc541224eadfb" }
arrow-array = "48"
arrow-cast = "48"
arrow-schema = "48"
Expand Down
10 changes: 7 additions & 3 deletions src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ use url::Url;

use crate::aws_auth::AwsAuthProps;

pub const REGION: &str = "region";
pub const ACCESS_KEY: &str = "access_key";
pub const SECRET_ACCESS: &str = "secret_access";

pub const AWS_DEFAULT_CONFIG: [&str; 7] = [
"region",
REGION,
"arn",
"profile",
"access_key",
"secret_access",
ACCESS_KEY,
SECRET_ACCESS,
"session_token",
"endpoint_url",
];
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum ConnectorError {
#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),

#[error(transparent)]
Internal(#[from] anyhow::Error),
}
Expand Down
11 changes: 10 additions & 1 deletion src/connector/src/source/pulsar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ pub mod topic;

pub use enumerator::*;
use serde::Deserialize;
use serde_with::serde_as;
pub use split::*;

use self::source::reader::PulsarSplitReader;
use crate::common::PulsarCommon;
use crate::source::pulsar::source::reader::PulsarSplitReader;
use crate::source::SourceProperties;

pub const PULSAR_CONNECTOR: &str = "pulsar";
Expand All @@ -36,6 +37,7 @@ impl SourceProperties for PulsarProperties {
}

#[derive(Clone, Debug, Deserialize)]
#[serde_as]
pub struct PulsarProperties {
#[serde(rename = "scan.startup.mode", alias = "pulsar.scan.startup.mode")]
pub scan_startup_mode: Option<String>,
Expand All @@ -45,4 +47,11 @@ pub struct PulsarProperties {

#[serde(flatten)]
pub common: PulsarCommon,

#[serde(rename = "iceberg.enabled")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub iceberg_loader_enabled: bool,

#[serde(rename = "iceberg.bucket", default)]
pub iceberg_bucket: Option<String>,
}
Loading

0 comments on commit e74b32d

Please sign in to comment.