Skip to content

Commit b364088

Browse files
github-actions[bot]tabVersiontabversion
authored
feat: message_as_jsonb to handle circle dep in protobuf (#19935) (#19987)
Signed-off-by: tabversion <[email protected]> Co-authored-by: Bohan Zhang <[email protected]> Co-authored-by: tabversion <[email protected]>
1 parent 336bbea commit b364088

File tree

10 files changed

+304
-42
lines changed

10 files changed

+304
-42
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
control substitution on
2+
3+
system ok
4+
rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_common.proto"
5+
6+
system ok
7+
rpk registry schema create "opentelemetry_test-value" --schema "/dev/stdin" --references opentelemetry_common.proto:opentelemetry_common.proto:1 --type protobuf << EOF
8+
syntax = "proto3";
9+
package opentelemetry_test;
10+
import "opentelemetry_common.proto";
11+
message OTLPTestMessage {
12+
opentelemetry.proto.common.v1.AnyValue any_value = 1;
13+
opentelemetry.proto.common.v1.KeyValueList key_value_list = 2;
14+
opentelemetry.proto.common.v1.InstrumentationScope instrumentation_scope = 3;
15+
}
16+
EOF
17+
18+
19+
system ok
20+
echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation
21+
22+
statement ok
23+
create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'opentelemetry_test.OTLPTestMessage', messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue');
24+
25+
statement ok
26+
flush;
27+
28+
sleep 1s
29+
30+
query T
31+
select count(*) from opentelemetry_test;
32+
----
33+
1
34+
35+
query TTT
36+
select any_value, key_value_list, instrumentation_scope from opentelemetry_test;
37+
----
38+
{"stringValue": "example"} {"values": [{"key": "key1", "value": {"stringValue": "value1"}}, {"key": "key2", "value": {"intValue": "42"}}]} (test-scope,1.0,{},0)
39+
40+
# ==== clean up ====
41+
42+
statement ok
43+
drop table opentelemetry_test;
44+
45+
system ok
46+
rpk topic delete opentelemetry_test;
47+
48+
system ok
49+
rpk registry subject delete "opentelemetry_test-value"
50+
51+
system ok
52+
rpk registry subject delete "opentelemetry_common.proto"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
control substitution on
2+
3+
system ok
4+
rpk registry schema create "recursive_complex-value" --schema "/dev/stdin" --type protobuf << EOF
5+
syntax = "proto3";
6+
// a recursive complex type can cause stack overflow in the frontend when inferring the schema
7+
package recursive_complex;
8+
message AnyValue {
9+
oneof value {
10+
string string_value = 1;
11+
int32 int_value = 2;
12+
double double_value = 3;
13+
bool bool_value = 4;
14+
ArrayValue array_value = 5;
15+
}
16+
}
17+
message ArrayValue {
18+
AnyValue value1 = 1;
19+
AnyValue value2 = 2;
20+
ArrayValue array_value = 3;
21+
}
22+
EOF
23+
24+
25+
system ok
26+
echo '{"array_value":{"value1":{"string_value":"This is a string value"},"value2":{"int_value":42},"array_value":{"value1":{"double_value":3.14159},"value2":{"bool_value":true},"array_value":{"value1":{"string_value":"Deeply nested string"},"value2":{"int_value":100}}}}}' | rpk topic produce "recursive_complex" --schema-id=topic --schema-type="recursive_complex.AnyValue" --allow-auto-topic-creation
27+
28+
# the test just make sure the table can finish create process
29+
statement ok
30+
create table recursive_complex with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'recursive_complex' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'recursive_complex.AnyValue', messages_as_jsonb = 'recursive_complex.AnyValue,recursive_complex.ArrayValue');
31+
32+
# ==== clean up ====
33+
34+
statement ok
35+
drop table recursive_complex;
36+
37+
system ok
38+
rpk topic delete recursive_complex;
39+
40+
system ok
41+
rpk registry subject delete "recursive_complex-value"

src/connector/codec/src/decoder/protobuf/mod.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
pub mod parser;
1616
use std::borrow::Cow;
17+
use std::collections::HashSet;
1718
use std::sync::LazyLock;
1819

1920
use parser::from_protobuf_value;
@@ -24,13 +25,17 @@ use thiserror_ext::AsReport;
2425

2526
use super::{uncategorized, Access, AccessResult};
2627

27-
pub struct ProtobufAccess {
28+
pub struct ProtobufAccess<'a> {
2829
message: DynamicMessage,
30+
messages_as_jsonb: &'a HashSet<String>,
2931
}
3032

31-
impl ProtobufAccess {
32-
pub fn new(message: DynamicMessage) -> Self {
33-
Self { message }
33+
impl<'a> ProtobufAccess<'a> {
34+
pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet<String>) -> Self {
35+
Self {
36+
message,
37+
messages_as_jsonb,
38+
}
3439
}
3540

3641
#[cfg(test)]
@@ -39,7 +44,7 @@ impl ProtobufAccess {
3944
}
4045
}
4146

42-
impl Access for ProtobufAccess {
47+
impl Access for ProtobufAccess<'_> {
4348
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
4449
debug_assert_eq!(1, path.len());
4550
let field_desc = self
@@ -56,12 +61,15 @@ impl Access for ProtobufAccess {
5661
})?;
5762

5863
match self.message.get_field(&field_desc) {
59-
Cow::Borrowed(value) => from_protobuf_value(&field_desc, value, type_expected),
64+
Cow::Borrowed(value) => {
65+
from_protobuf_value(&field_desc, value, type_expected, self.messages_as_jsonb)
66+
}
6067

6168
// `Owned` variant occurs only if there's no such field and the default value is returned.
62-
Cow::Owned(value) => from_protobuf_value(&field_desc, &value, type_expected)
63-
// enforce `Owned` variant to avoid returning a reference to a temporary value
64-
.map(|d| d.to_owned_datum().into()),
69+
Cow::Owned(value) => {
70+
from_protobuf_value(&field_desc, &value, type_expected, self.messages_as_jsonb)
71+
.map(|d| d.to_owned_datum().into())
72+
}
6573
}
6674
}
6775
}

src/connector/codec/src/decoder/protobuf/parser.rs

+55-13
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
16+
1517
use anyhow::Context;
1618
use itertools::Itertools;
1719
use prost_reflect::{Cardinality, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value};
@@ -26,14 +28,22 @@ use thiserror_ext::Macro;
2628

2729
use crate::decoder::{uncategorized, AccessError, AccessResult};
2830

31+
pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb";
32+
2933
pub fn pb_schema_to_column_descs(
3034
message_descriptor: &MessageDescriptor,
35+
messages_as_jsonb: &HashSet<String>,
3136
) -> anyhow::Result<Vec<ColumnDesc>> {
3237
let mut columns = Vec::with_capacity(message_descriptor.fields().len());
3338
let mut index = 0;
3439
let mut parse_trace: Vec<String> = vec![];
3540
for field in message_descriptor.fields() {
36-
columns.push(pb_field_to_col_desc(&field, &mut index, &mut parse_trace)?);
41+
columns.push(pb_field_to_col_desc(
42+
&field,
43+
&mut index,
44+
&mut parse_trace,
45+
messages_as_jsonb,
46+
)?);
3747
}
3848

3949
Ok(columns)
@@ -44,15 +54,18 @@ fn pb_field_to_col_desc(
4454
field_descriptor: &FieldDescriptor,
4555
index: &mut i32,
4656
parse_trace: &mut Vec<String>,
57+
messages_as_jsonb: &HashSet<String>,
4758
) -> anyhow::Result<ColumnDesc> {
48-
let field_type = protobuf_type_mapping(field_descriptor, parse_trace)
59+
let field_type = protobuf_type_mapping(field_descriptor, parse_trace, messages_as_jsonb)
4960
.context("failed to map protobuf type")?;
50-
if let Kind::Message(m) = field_descriptor.kind() {
61+
if let Kind::Message(m) = field_descriptor.kind()
62+
&& !messages_as_jsonb.contains(m.full_name())
63+
{
5164
let field_descs = if let DataType::List { .. } = field_type {
5265
vec![]
5366
} else {
5467
m.fields()
55-
.map(|f| pb_field_to_col_desc(&f, index, parse_trace))
68+
.map(|f| pb_field_to_col_desc(&f, index, parse_trace, messages_as_jsonb))
5669
.try_collect()?
5770
};
5871
*index += 1;
@@ -92,10 +105,12 @@ fn detect_loop_and_push(
92105
let identifier = format!("{}({})", fd.name(), fd.full_name());
93106
if trace.iter().any(|s| s == identifier.as_str()) {
94107
bail_protobuf_type_error!(
95-
"circular reference detected: {}, conflict with {}, kind {:?}",
108+
"circular reference detected: {}, conflict with {}, kind {:?}. Adding {:?} to {:?} may help.",
96109
trace.iter().format("->"),
97110
identifier,
98111
fd.kind(),
112+
fd.kind(),
113+
PROTOBUF_MESSAGES_AS_JSONB,
99114
);
100115
}
101116
trace.push(identifier);
@@ -106,6 +121,7 @@ pub fn from_protobuf_value<'a>(
106121
field_desc: &FieldDescriptor,
107122
value: &'a Value,
108123
type_expected: &DataType,
124+
messages_as_jsonb: &'a HashSet<String>,
109125
) -> AccessResult<DatumCow<'a>> {
110126
let kind = field_desc.kind();
111127

@@ -136,7 +152,7 @@ pub fn from_protobuf_value<'a>(
136152
ScalarImpl::Utf8(enum_symbol.name().into())
137153
}
138154
Value::Message(dyn_msg) => {
139-
if dyn_msg.descriptor().full_name() == "google.protobuf.Any" {
155+
if messages_as_jsonb.contains(dyn_msg.descriptor().full_name()) {
140156
ScalarImpl::Jsonb(JsonbVal::from(
141157
serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?,
142158
))
@@ -159,8 +175,13 @@ pub fn from_protobuf_value<'a>(
159175
};
160176
let value = dyn_msg.get_field(&field_desc);
161177
rw_values.push(
162-
from_protobuf_value(&field_desc, &value, expected_field_type)?
163-
.to_owned_datum(),
178+
from_protobuf_value(
179+
&field_desc,
180+
&value,
181+
expected_field_type,
182+
messages_as_jsonb,
183+
)?
184+
.to_owned_datum(),
164185
);
165186
}
166187
ScalarImpl::Struct(StructValue::new(rw_values))
@@ -176,7 +197,12 @@ pub fn from_protobuf_value<'a>(
176197
};
177198
let mut builder = element_type.create_array_builder(values.len());
178199
for value in values {
179-
builder.append(from_protobuf_value(field_desc, value, element_type)?);
200+
builder.append(from_protobuf_value(
201+
field_desc,
202+
value,
203+
element_type,
204+
messages_as_jsonb,
205+
)?);
180206
}
181207
ScalarImpl::List(ListValue::new(builder.finish()))
182208
}
@@ -209,11 +235,13 @@ pub fn from_protobuf_value<'a>(
209235
&map_desc.map_entry_key_field(),
210236
&key.clone().into(),
211237
map_type.key(),
238+
messages_as_jsonb,
212239
)?);
213240
value_builder.append(from_protobuf_value(
214241
&map_desc.map_entry_value_field(),
215242
value,
216243
map_type.value(),
244+
messages_as_jsonb,
217245
)?);
218246
}
219247
let keys = key_builder.finish();
@@ -231,6 +259,7 @@ pub fn from_protobuf_value<'a>(
231259
fn protobuf_type_mapping(
232260
field_descriptor: &FieldDescriptor,
233261
parse_trace: &mut Vec<String>,
262+
messages_as_jsonb: &HashSet<String>,
234263
) -> std::result::Result<DataType, ProtobufTypeError> {
235264
detect_loop_and_push(parse_trace, field_descriptor)?;
236265
let mut t = match field_descriptor.kind() {
@@ -245,20 +274,33 @@ fn protobuf_type_mapping(
245274
Kind::Uint64 | Kind::Fixed64 => DataType::Decimal,
246275
Kind::String => DataType::Varchar,
247276
Kind::Message(m) => {
248-
if m.full_name() == "google.protobuf.Any" {
277+
if messages_as_jsonb.contains(m.full_name()) {
249278
// Well-Known Types are identified by their full name
250279
DataType::Jsonb
251280
} else if m.is_map_entry() {
252281
// Map is equivalent to `repeated MapFieldEntry map_field = N;`
253282
debug_assert!(field_descriptor.is_map());
254-
let key = protobuf_type_mapping(&m.map_entry_key_field(), parse_trace)?;
255-
let value = protobuf_type_mapping(&m.map_entry_value_field(), parse_trace)?;
283+
let key = protobuf_type_mapping(
284+
&m.map_entry_key_field(),
285+
parse_trace,
286+
messages_as_jsonb,
287+
)?;
288+
let value = protobuf_type_mapping(
289+
&m.map_entry_value_field(),
290+
parse_trace,
291+
messages_as_jsonb,
292+
)?;
256293
_ = parse_trace.pop();
257294
return Ok(DataType::Map(MapType::from_kv(key, value)));
258295
} else {
259296
let fields = m
260297
.fields()
261-
.map(|f| Ok((f.name().to_owned(), protobuf_type_mapping(&f, parse_trace)?)))
298+
.map(|f| {
299+
Ok((
300+
f.name().to_owned(),
301+
protobuf_type_mapping(&f, parse_trace, messages_as_jsonb)?,
302+
))
303+
})
262304
.try_collect::<_, Vec<_>, _>()?;
263305
StructType::new(fields).into()
264306
}

0 commit comments

Comments
 (0)