diff --git a/.github/workflows/connector-node-integration.yml b/.github/workflows/connector-node-integration.yml index f20f209a3953d..abc1942055fb5 100644 --- a/.github/workflows/connector-node-integration.yml +++ b/.github/workflows/connector-node-integration.yml @@ -2,11 +2,11 @@ name: Connector Node Integration Tests on: push: - branches: [main] + branches: [ main ] pull_request: - branches: [main] + branches: [ main ] merge_group: - types: [checks_requested] + types: [ checks_requested ] jobs: build: @@ -42,4 +42,8 @@ jobs: echo "--- build connector node" cd ${RISINGWAVE_ROOT}/java # run unit test - mvn --batch-mode --update-snapshots clean package -Dno-build-rust + # WARN: `testOnNext_writeValidation` is skipped because it relies on Rust code to decode message, + # while we don't build Rust code (`-Dno-build-rust`) here to save time + mvn --batch-mode --update-snapshots clean package -Dno-build-rust \ + '-Dtest=!com.risingwave.connector.sink.SinkStreamObserverTest#testOnNext_writeValidation' \ + -Dsurefire.failIfNoSpecifiedTests=false diff --git a/ci/scripts/connector-node-integration-test.sh b/ci/scripts/connector-node-integration-test.sh index c90430d80ba3e..8853243b66805 100755 --- a/ci/scripts/connector-node-integration-test.sh +++ b/ci/scripts/connector-node-integration-test.sh @@ -90,15 +90,15 @@ export PYTHONPATH=proto echo "--- running streamchunk data format integration tests" cd "${RISINGWAVE_ROOT}"/java/connector-node/python-client -if python3 integration_tests.py --stream_chunk_format_test --input_binary_file="./data/stream_chunk_data" --data_format_use_json=False; then +if python3 integration_tests.py --stream_chunk_format_test --input_binary_file="./data/stream_chunk_data"; then echo "StreamChunk data format test passed" else echo "StreamChunk data format test failed" exit 1 fi -sink_input_feature=("--input_binary_file=./data/sink_input --data_format_use_json=False") -upsert_sink_input_feature=("--input_binary_file=./data/upsert_sink_input --data_format_use_json=False") +sink_input_feature=("--input_binary_file=./data/sink_input") +upsert_sink_input_feature=("--input_binary_file=./data/upsert_sink_input") type=("StreamChunk format") ${MC_PATH} mb minio/bucket diff --git a/java/connector-node/python-client/.gitignore b/java/connector-node/python-client/.gitignore index 600d2d33badf4..536c32383d754 100644 --- a/java/connector-node/python-client/.gitignore +++ b/java/connector-node/python-client/.gitignore @@ -1 +1,2 @@ -.vscode \ No newline at end of file +.vscode +sink-client-venv/ diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index b16b5eaf34ad4..909859afc218a 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -117,7 +117,7 @@ def load_stream_chunk_payload(input_file): return payloads -def test_sink(prop, format, payload_input, table_schema, is_coordinated=False): +def test_sink(prop, payload_input, table_schema, is_coordinated=False): sink_param = connector_service_pb2.SinkParam( sink_id=0, properties=prop, @@ -128,7 +128,6 @@ def test_sink(prop, format, payload_input, table_schema, is_coordinated=False): request_list = [ connector_service_pb2.SinkWriterStreamRequest( start=connector_service_pb2.SinkWriterStreamRequest.StartSink( - format=format, sink_param=sink_param, ) ) @@ -291,9 +290,6 @@ def test_stream_chunk_data_format(param): parser.add_argument( "--deltalake_sink", action="store_true", help="run deltalake sink test" ) - parser.add_argument( - "--input_file", default="./data/sink_input.json", help="input data to run tests" - ) parser.add_argument( "--input_binary_file", default="./data/sink_input", @@ -302,29 +298,18 @@ def test_stream_chunk_data_format(param): parser.add_argument( "--es_sink", action="store_true", help="run elasticsearch sink test" ) - parser.add_argument( - "--data_format_use_json", default=True, help="choose json or streamchunk" - ) args = parser.parse_args() - use_json = args.data_format_use_json == True or args.data_format_use_json == "True" - if use_json: - payload = load_json_payload(args.input_file) - format = connector_service_pb2.SinkPayloadFormat.JSON - else: - payload = load_stream_chunk_payload(args.input_binary_file) - format = connector_service_pb2.SinkPayloadFormat.STREAM_CHUNK + payload = load_stream_chunk_payload(args.input_binary_file) # stream chunk format if args.stream_chunk_format_test: param = { - "format": format, "payload_input": payload, "table_schema": make_mock_schema_stream_chunk(), } test_stream_chunk_data_format(param) param = { - "format": format, "payload_input": payload, "table_schema": make_mock_schema(), } @@ -337,7 +322,5 @@ def test_stream_chunk_data_format(param): test_deltalake_sink(param) if args.es_sink: test_elasticsearch_sink(param) - - # json format if args.upsert_iceberg_sink: test_upsert_iceberg_sink(param) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java deleted file mode 100644 index c941b09efe95c..0000000000000 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector; - -import static io.grpc.Status.INVALID_ARGUMENT; - -import com.google.gson.Gson; -import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.*; -import com.risingwave.proto.ConnectorServiceProto; -import com.risingwave.proto.ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch.JsonPayload; -import com.risingwave.proto.Data; -import java.math.BigDecimal; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.util.Base64; -import java.util.Map; -import java.util.stream.Collectors; - -public class JsonDeserializer implements Deserializer { - private final TableSchema tableSchema; - - public JsonDeserializer(TableSchema tableSchema) { - this.tableSchema = tableSchema; - } - - // Encoding here should be consistent with `datum_to_json_object()` in - // src/connector/src/sink/mod.rs - @Override - public CloseableIterable deserialize( - ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch writeBatch) { - if (!writeBatch.hasJsonPayload()) { - throw INVALID_ARGUMENT - .withDescription("expected JsonPayload, got " + writeBatch.getPayloadCase()) - .asRuntimeException(); - } - JsonPayload jsonPayload = writeBatch.getJsonPayload(); - return new TrivialCloseIterable<>( - jsonPayload.getRowOpsList().stream() - .map( - rowOp -> { - Map columnValues = - new Gson().fromJson(rowOp.getLine(), Map.class); - Object[] values = new Object[columnValues.size()]; - for (String columnName : tableSchema.getColumnNames()) { - if (!columnValues.containsKey(columnName)) { - throw INVALID_ARGUMENT - .withDescription( - "column " - + columnName - + " not found in json") - .asRuntimeException(); - } - Data.DataType.TypeName typeName = - tableSchema.getColumnType(columnName); - values[tableSchema.getColumnIndex(columnName)] = - validateJsonDataTypes( - typeName, columnValues.get(columnName)); - } - return (SinkRow) new ArraySinkRow(rowOp.getOpType(), values); - }) - .collect(Collectors.toList())); - } - - private static Long castLong(Object value) { - if (value instanceof Integer) { - return ((Integer) value).longValue(); - } else if (value instanceof Double) { - double d = (Double) value; - if (d % 1.0 != 0.0) { - - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription( - "unable to cast into long from non-integer double value: " + d) - .asRuntimeException(); - } - return ((Double) value).longValue(); - } else if (value instanceof Long) { - return (Long) value; - } else if (value instanceof Short) { - return ((Short) value).longValue(); - } else if (value instanceof Float) { - double f = (Float) value; - if (f % 1.0 != 0.0) { - - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription( - "unable to cast into long from non-integer float value: " + f) - .asRuntimeException(); - } - return ((Float) value).longValue(); - } else { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unable to cast into long from " + value.getClass()) - .asRuntimeException(); - } - } - - private static Double castDouble(Object value) { - if (value instanceof Double) { - return (Double) value; - } else if (value instanceof Float) { - return ((Float) value).doubleValue(); - } else { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unable to cast into double from " + value.getClass()) - .asRuntimeException(); - } - } - - private static BigDecimal castDecimal(Object value) { - if (value instanceof String) { - // FIXME(eric): See `datum_to_json_object()` in src/connector/src/sink/mod.rs - return new BigDecimal((String) value); - } else if (value instanceof BigDecimal) { - return (BigDecimal) value; - } else { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unable to cast into double from " + value.getClass()) - .asRuntimeException(); - } - } - - private static LocalTime castTime(Object value) { - try { - Long milli = castLong(value); - return LocalTime.ofNanoOfDay(milli * 1_000_000L); - } catch (RuntimeException e) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unable to cast into time from " + value.getClass()) - .asRuntimeException(); - } - } - - private static LocalDate castDate(Object value) { - try { - Long days = castLong(value); - return LocalDate.ofEpochDay(days); - } catch (RuntimeException e) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unable to cast into date from " + value.getClass()) - .asRuntimeException(); - } - } - - private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) { - // value might be null - if (value == null) { - return null; - } - switch (typeName) { - case INT16: - return castLong(value).shortValue(); - case INT32: - return castLong(value).intValue(); - case INT64: - return castLong(value); - case VARCHAR: - if (!(value instanceof String)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected string, got " + value.getClass()) - .asRuntimeException(); - } - return value; - case DOUBLE: - return castDouble(value); - case FLOAT: - return castDouble(value).floatValue(); - case DECIMAL: - return castDecimal(value); - case BOOLEAN: - if (!(value instanceof Boolean)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected boolean, got " + value.getClass()) - .asRuntimeException(); - } - return value; - case TIMESTAMP: - if (!(value instanceof String)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription( - "Expected timestamp in string, got " + value.getClass()) - .asRuntimeException(); - } - return LocalDateTime.parse((String) value); - case TIMESTAMPTZ: - if (!(value instanceof String)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription( - "Expected timestamptz in string, got " + value.getClass()) - .asRuntimeException(); - } - return OffsetDateTime.parse((String) value); - case TIME: - return castTime(value); - case DATE: - return castDate(value); - case INTERVAL: - if (!(value instanceof String)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected interval, got " + value.getClass()) - .asRuntimeException(); - } - return value; - case JSONB: - if (!(value instanceof String)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected jsonb, got " + value.getClass()) - .asRuntimeException(); - } - return value; - case BYTEA: - if (!(value instanceof String)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected bytea, got " + value.getClass()) - .asRuntimeException(); - } - return Base64.getDecoder().decode((String) value); - case LIST: - if (!(value instanceof java.util.ArrayList)) { - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("Expected list, got " + value.getClass()) - .asRuntimeException(); - } - return ((java.util.ArrayList) value).toArray(); - default: - throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unsupported type " + typeName) - .asRuntimeException(); - } - } -} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index cd61da38d6cb5..53dfe326fbd9d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -206,19 +206,7 @@ private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink st String connectorName = getConnectorName(sinkParam); SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); - switch (startSink.getFormat()) { - case FORMAT_UNSPECIFIED: - case UNRECOGNIZED: - throw INVALID_ARGUMENT - .withDescription("should specify payload format in request") - .asRuntimeException(); - case JSON: - deserializer = new JsonDeserializer(tableSchema); - break; - case STREAM_CHUNK: - deserializer = new StreamChunkDeserializer(tableSchema); - break; - } + deserializer = new StreamChunkDeserializer(tableSchema); this.connectorName = connectorName.toUpperCase(); ConnectorNodeMetrics.incActiveSinkConnections(connectorName, "node1"); } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/DeserializerTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/DeserializerTest.java deleted file mode 100644 index 9284a2ef8fd20..0000000000000 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/DeserializerTest.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.sink; - -import com.risingwave.connector.JsonDeserializer; -import com.risingwave.connector.TestUtils; -import com.risingwave.connector.api.sink.SinkRow; -import com.risingwave.proto.ConnectorServiceProto; -import com.risingwave.proto.ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch.JsonPayload; -import com.risingwave.proto.Data; -import junit.framework.TestCase; - -public class DeserializerTest extends TestCase { - public void testJsonDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer(TestUtils.getMockTableSchema()); - JsonPayload jsonPayload = - JsonPayload.newBuilder() - .addRowOps( - JsonPayload.RowOp.newBuilder() - .setOpType(Data.Op.INSERT) - .setLine("{\"id\": 1, \"name\": \"John\"}") - .build()) - .build(); - ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch writeBatch = - ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch.newBuilder() - .setJsonPayload(jsonPayload) - .build(); - SinkRow outcome = deserializer.deserialize(writeBatch).iterator().next(); - assertEquals(outcome.get(0), 1); - assertEquals(outcome.get(1), "John"); - } -} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java index f0dcc4c1c4930..885fc7eb927a3 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java @@ -14,10 +14,10 @@ package com.risingwave.connector.sink; +import com.google.protobuf.ByteString; import com.risingwave.connector.SinkWriterStreamObserver; import com.risingwave.connector.TestUtils; import com.risingwave.proto.ConnectorServiceProto; -import com.risingwave.proto.Data.Op; import io.grpc.stub.StreamObserver; import java.util.Map; import org.junit.Assert; @@ -94,7 +94,6 @@ public void testOnNext_syncValidation() { .setStart( ConnectorServiceProto.SinkWriterStreamRequest.StartSink.newBuilder() .setSinkParam(fileSinkParam) - .setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON) .build()) .build(); ConnectorServiceProto.SinkWriterStreamRequest firstSync = @@ -138,7 +137,6 @@ public void testOnNext_startEpochValidation() { .setStart( ConnectorServiceProto.SinkWriterStreamRequest.StartSink.newBuilder() .setSinkParam(fileSinkParam) - .setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON) .build()) .build(); ConnectorServiceProto.SinkWriterStreamRequest firstSync = @@ -156,6 +154,8 @@ public void testOnNext_startEpochValidation() { sinkWriterStreamObserver.onNext(firstSync); } + // WARN! This test is skipped in CI pipeline see + // `.github/workflows/connector-node-integration.yml` @Test public void testOnNext_writeValidation() { SinkWriterStreamObserver sinkWriterStreamObserver; @@ -164,10 +164,16 @@ public void testOnNext_writeValidation() { ConnectorServiceProto.SinkWriterStreamRequest.newBuilder() .setStart( ConnectorServiceProto.SinkWriterStreamRequest.StartSink.newBuilder() - .setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON) .setSinkParam(fileSinkParam)) .build(); + // Encoded StreamChunk: 1 'test' + byte[] data1 = + new byte[] { + 8, 1, 18, 1, 1, 26, 20, 8, 2, 18, 6, 8, 1, 18, 2, 1, 1, 26, 8, 8, 1, 18, 4, 0, + 0, 0, 1, 26, 42, 8, 6, 18, 6, 8, 1, 18, 2, 1, 1, 26, 20, 8, 1, 18, 16, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 26, 8, 8, 1, 18, 4, 116, 101, 115, 116 + }; ConnectorServiceProto.SinkWriterStreamRequest firstWrite = ConnectorServiceProto.SinkWriterStreamRequest.newBuilder() .setWriteBatch( @@ -175,19 +181,11 @@ public void testOnNext_writeValidation() { .newBuilder() .setEpoch(0) .setBatchId(1) - .setJsonPayload( + .setStreamChunkPayload( ConnectorServiceProto.SinkWriterStreamRequest - .WriteBatch.JsonPayload.newBuilder() - .addRowOps( - ConnectorServiceProto - .SinkWriterStreamRequest - .WriteBatch.JsonPayload - .RowOp.newBuilder() - .setOpType(Op.INSERT) - .setLine( - "{\"id\": 1, \"name\": \"test\"}") - .build())) - .build()) + .WriteBatch.StreamChunkPayload.newBuilder() + .setBinaryData(ByteString.copyFrom(data1)) + .build())) .build(); ConnectorServiceProto.SinkWriterStreamRequest firstSync = @@ -199,6 +197,13 @@ public void testOnNext_writeValidation() { .build()) .build(); + // Encoded StreamChunk: 2 'test' + byte[] data2 = + new byte[] { + 8, 1, 18, 1, 1, 26, 20, 8, 2, 18, 6, 8, 1, 18, 2, 1, 1, 26, 8, 8, 1, 18, 4, 0, + 0, 0, 2, 26, 42, 8, 6, 18, 6, 8, 1, 18, 2, 1, 1, 26, 20, 8, 1, 18, 16, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 26, 8, 8, 1, 18, 4, 116, 101, 115, 116 + }; ConnectorServiceProto.SinkWriterStreamRequest secondWrite = ConnectorServiceProto.SinkWriterStreamRequest.newBuilder() .setWriteBatch( @@ -206,19 +211,11 @@ public void testOnNext_writeValidation() { .newBuilder() .setEpoch(1) .setBatchId(2) - .setJsonPayload( + .setStreamChunkPayload( ConnectorServiceProto.SinkWriterStreamRequest - .WriteBatch.JsonPayload.newBuilder() - .addRowOps( - ConnectorServiceProto - .SinkWriterStreamRequest - .WriteBatch.JsonPayload - .RowOp.newBuilder() - .setOpType(Op.INSERT) - .setLine( - "{\"id\": 2, \"name\": \"test\"}") - .build())) - .build()) + .WriteBatch.StreamChunkPayload.newBuilder() + .setBinaryData(ByteString.copyFrom(data2)) + .build())) .build(); ConnectorServiceProto.SinkWriterStreamRequest secondWriteWrongEpoch = @@ -228,19 +225,11 @@ public void testOnNext_writeValidation() { .newBuilder() .setEpoch(2) .setBatchId(3) - .setJsonPayload( + .setStreamChunkPayload( ConnectorServiceProto.SinkWriterStreamRequest - .WriteBatch.JsonPayload.newBuilder() - .addRowOps( - ConnectorServiceProto - .SinkWriterStreamRequest - .WriteBatch.JsonPayload - .RowOp.newBuilder() - .setOpType(Op.INSERT) - .setLine( - "{\"id\": 2, \"name\": \"test\"}") - .build())) - .build()) + .WriteBatch.StreamChunkPayload.newBuilder() + .setBinaryData(ByteString.copyFrom(data2)) + .build())) .build(); boolean exceptionThrown = false; @@ -251,7 +240,10 @@ public void testOnNext_writeValidation() { sinkWriterStreamObserver.onNext(firstWrite); } catch (RuntimeException e) { exceptionThrown = true; - Assert.assertTrue(e.getMessage().toLowerCase().contains("batch id")); + if (!e.getMessage().toLowerCase().contains("batch id")) { + e.printStackTrace(); + Assert.fail("Expected `batch id`, but got " + e.getMessage()); + } } if (!exceptionThrown) { Assert.fail("Expected exception not thrown: `invalid batch id`"); @@ -267,7 +259,10 @@ public void testOnNext_writeValidation() { sinkWriterStreamObserver.onNext(secondWriteWrongEpoch); } catch (RuntimeException e) { exceptionThrown = true; - Assert.assertTrue(e.getMessage().toLowerCase().contains("invalid epoch")); + if (!e.getMessage().toLowerCase().contains("invalid epoch")) { + e.printStackTrace(); + Assert.fail("Expected `invalid epoch`, but got " + e.getMessage()); + } } if (!exceptionThrown) { Assert.fail( diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 4deb0d6fb6096..da2c2b88087ea 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -4,7 +4,6 @@ package connector_service; import "catalog.proto"; import "common.proto"; -import "data.proto"; import "plan_common.proto"; option java_outer_classname = "ConnectorServiceProto"; @@ -30,34 +29,21 @@ message SinkParam { string sink_name = 8; } -enum SinkPayloadFormat { - FORMAT_UNSPECIFIED = 0; - JSON = 1; - STREAM_CHUNK = 2; -} - message SinkWriterStreamRequest { message StartSink { SinkParam sink_param = 1; - SinkPayloadFormat format = 2; + // Deprecated: SinkPayloadFormat format = 2; + reserved "format"; + reserved 2; TableSchema payload_schema = 3; } message WriteBatch { - message JsonPayload { - message RowOp { - data.Op op_type = 1; - string line = 2; - } - repeated RowOp row_ops = 1; - } - message StreamChunkPayload { bytes binary_data = 1; } oneof payload { - JsonPayload json_payload = 1; StreamChunkPayload stream_chunk_payload = 2; // This is a reference pointer to a StreamChunk. The StreamChunk is owned // by the JniSinkWriterStreamRequest, which should handle the release of StreamChunk. @@ -65,6 +51,10 @@ message SinkWriterStreamRequest { int64 stream_chunk_ref_pointer = 5; } + // Deprecated in oneof payload: JsonPayload json_payload = 1; + reserved "json_payload"; + reserved 1; + uint64 batch_id = 3; uint64 epoch = 4; } diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 033cb47ba5a24..cfb506fa2efab 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -49,7 +49,6 @@ use risingwave_connector::source::datagen::{ use risingwave_connector::source::{ Column, DataType, SourceContext, SourceEnumeratorContext, SplitEnumerator, SplitReader, }; -use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_stream::executor::test_utils::prelude::ColumnDesc; use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError}; use serde::{Deserialize, Deserializer}; @@ -472,9 +471,8 @@ async fn main() { sink_from_name: "not_need_set".to_string(), }; let sink = build_sink(sink_param).unwrap(); - let mut sink_writer_param = SinkWriterParam::for_test(); + let sink_writer_param = SinkWriterParam::for_test(); println!("Start Sink Bench!, Wait {:?}s", BENCH_TIME); - sink_writer_param.connector_params.sink_payload_format = SinkPayloadFormat::StreamChunk; tokio::spawn(async move { dispatch_sink!(sink, sink, { consume_log_stream(sink, mock_range_log_reader, sink_writer_param).boxed() diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 62030a3c84aff..819aba03d865e 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -356,7 +356,6 @@ mod test { http://127.0.0.1:5690/, ], ), - connector_rpc_sink_payload_format: None, config_path: "src/config/test.toml", total_memory_bytes: 34359738368, reserved_memory_bytes: None, diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 344e0c781eb5e..2c25f7808d93d 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -76,10 +76,6 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] pub meta_address: MetaAddressStrategy, - /// Payload format of connector sink rpc - #[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")] - pub connector_rpc_sink_payload_format: Option, - /// The path of `risingwave.toml` configuration file. /// /// If empty, default configuration values will be used. diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 07dee5dfa4c8c..b9cd6fd09f751 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -42,7 +42,6 @@ use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS; use risingwave_dml::dml_manager::DmlManager; use risingwave_pb::common::WorkerType; use risingwave_pb::compute::config_service_server::ConfigServiceServer; -use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; @@ -334,28 +333,9 @@ pub async fn compute_node_serve( config.server.metrics_level, ); - info!( - "connector param: payload_format={:?}", - opts.connector_rpc_sink_payload_format - ); - - let connector_params = risingwave_connector::ConnectorParams { - sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() { - None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk, - Some("json") => SinkPayloadFormat::Json, - _ => { - unreachable!( - "invalid sink payload format: {:?}. Should be either json or stream_chunk", - opts.connector_rpc_sink_payload_format - ) - } - }, - }; - // Initialize the streaming environment. let stream_env = StreamEnvironment::new( advertise_addr.clone(), - connector_params, stream_config, worker_id, state_store, diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 70de9f8561a76..c866a68b298d6 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -41,7 +41,6 @@ use std::time::Duration; use duration_str::parse_std; -use risingwave_pb::connector_service::SinkPayloadFormat; use serde::de; pub mod aws_utils; @@ -64,19 +63,6 @@ pub use with_options::WithPropertiesExt; #[cfg(test)] mod with_options_test; -#[derive(Clone, Debug, Default)] -pub struct ConnectorParams { - pub sink_payload_format: SinkPayloadFormat, -} - -impl ConnectorParams { - pub fn new(sink_payload_format: SinkPayloadFormat) -> Self { - Self { - sink_payload_format, - } - } -} - pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result where D: de::Deserializer<'de>, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index d281a97ef6c26..2ef4bb953b67e 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -71,7 +71,6 @@ use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::SinkWriter; -use crate::ConnectorParams; const BOUNDED_CHANNEL_SIZE: usize = 16; #[macro_export] @@ -288,7 +287,6 @@ impl SinkMetrics { #[derive(Clone)] pub struct SinkWriterParam { - pub connector_params: ConnectorParams, pub executor_id: u64, pub vnode_bitmap: Option, pub meta_client: Option, @@ -326,7 +324,6 @@ impl SinkMetaClient { impl SinkWriterParam { pub fn for_test() -> Self { SinkWriterParam { - connector_params: Default::default(), executor_id: Default::default(), vnode_bitmap: Default::default(), meta_client: Default::default(), diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 3ab7e90a69367..679434d08b194 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -42,8 +42,8 @@ use risingwave_pb::connector_service::sink_writer_stream_request::{ use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, - SinkPayloadFormat, SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, - ValidateSinkRequest, ValidateSinkResponse, + SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest, + ValidateSinkResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::{ @@ -68,7 +68,6 @@ use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, SinkLogReader, SinkMetrics, SinkParam, SinkWriterParam, }; -use crate::ConnectorParams; macro_rules! def_remote_sink { () => { @@ -82,7 +81,6 @@ macro_rules! def_remote_sink { { HttpJava, HttpJavaSink, "http" } } }; - () => {}; ({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }) => { #[derive(Debug)] pub struct $variant_name; @@ -283,7 +281,7 @@ impl RemoteLogSinker { request_sender, response_stream, } = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(payload_schema, sink_proto, SinkPayloadFormat::StreamChunk) + .start_sink_writer_stream(payload_schema, sink_proto) .await?; let sink_metrics = writer_param.sink_metrics; @@ -547,12 +545,8 @@ impl Sink for CoordinatedRemoteSink { "sink needs coordination should not have singleton input" )) })?, - CoordinatedRemoteSinkWriter::new( - self.param.clone(), - writer_param.connector_params, - writer_param.sink_metrics.clone(), - ) - .await?, + CoordinatedRemoteSinkWriter::new(self.param.clone(), writer_param.sink_metrics.clone()) + .await?, ) .await? .into_log_sinker(writer_param.sink_metrics)) @@ -572,18 +566,10 @@ pub struct CoordinatedRemoteSinkWriter { } impl CoordinatedRemoteSinkWriter { - pub async fn new( - param: SinkParam, - connector_params: ConnectorParams, - sink_metrics: SinkMetrics, - ) -> Result { + pub async fn new(param: SinkParam, sink_metrics: SinkMetrics) -> Result { let sink_proto = param.to_proto(); let stream_handle = EmbeddedConnectorClient::new()? - .start_sink_writer_stream( - sink_proto.table_schema.clone(), - sink_proto, - connector_params.sink_payload_format, - ) + .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto) .await?; Ok(Self { @@ -717,13 +703,11 @@ impl EmbeddedConnectorClient { &self, payload_schema: Option, sink_proto: PbSinkParam, - sink_payload_format: SinkPayloadFormat, ) -> Result> { let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { sink_param: Some(sink_proto), - format: sink_payload_format as i32, payload_schema, })), }, diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index 808ef1232c50c..cbd63a2a4906a 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -30,7 +30,6 @@ use crate::parser::additional_columns::add_partition_offset_cols; use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; use crate::source::monitor::SourceMetrics; use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY}; -use crate::ConnectorParams; pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16; @@ -59,7 +58,6 @@ pub struct SourceDescBuilder { row_id_index: Option, with_properties: HashMap, source_info: PbStreamSourceInfo, - connector_params: ConnectorParams, connector_message_buffer_size: usize, pk_indices: Vec, } @@ -72,7 +70,6 @@ impl SourceDescBuilder { row_id_index: Option, with_properties: HashMap, source_info: PbStreamSourceInfo, - connector_params: ConnectorParams, connector_message_buffer_size: usize, pk_indices: Vec, ) -> Self { @@ -82,7 +79,6 @@ impl SourceDescBuilder { row_id_index, with_properties, source_info, - connector_params, connector_message_buffer_size, pk_indices, } @@ -223,7 +219,6 @@ pub mod test_utils { row_id_index, with_properties, source_info, - connector_params: Default::default(), connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, pk_indices, } diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index f4a87e3c1f5b4..3042394b25877 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -278,7 +278,6 @@ impl ConnectorClient { &self, payload_schema: Option, sink_proto: PbSinkParam, - sink_payload_format: SinkPayloadFormat, ) -> Result { let mut rpc_client = self.rpc_client.clone(); let (handle, first_rsp) = SinkWriterStreamHandle::initialize( @@ -286,7 +285,6 @@ impl ConnectorClient { request: Some(SinkRequest::Start(StartSink { payload_schema, sink_param: Some(sink_proto), - format: sink_payload_format as i32, })), }, |rx| async move { diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 541dce562ba04..5e77be7beb7a0 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -180,7 +180,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { ); let sink_write_param = SinkWriterParam { - connector_params: params.env.connector_params(), executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 6dd2be7263c29..1951365a47eed 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -53,7 +53,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { source.row_id_index.map(|x| x as _), source.with_properties.clone(), source_info.clone(), - params.env.connector_params(), params.env.config().developer.connector_message_buffer_size, params.info.pk_indices.clone(), ); diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 0013792d51326..8c00fb0a50830 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -118,7 +118,6 @@ pub fn create_source_desc_builder( row_id_index.map(|x| x as _), with_properties, source_info, - params.env.connector_params(), params.env.config().developer.connector_message_buffer_size, // `pk_indices` is used to ensure that a message will be skipped instead of parsed // with null pk when the pk column is missing. diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index 9a0b26f25f0c5..a47eb8279224c 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -19,7 +19,6 @@ use risingwave_common::config::StreamingConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::monitor::SourceMetrics; -use risingwave_connector::ConnectorParams; use risingwave_dml::dml_manager::DmlManagerRef; use risingwave_rpc_client::MetaClient; use risingwave_storage::StateStoreImpl; @@ -33,9 +32,6 @@ pub struct StreamEnvironment { /// Endpoint the stream manager listens on. server_addr: HostAddr, - /// Parameters used by connector nodes. - connector_params: ConnectorParams, - /// Streaming related configurations. config: Arc, @@ -65,7 +61,6 @@ impl StreamEnvironment { #[allow(clippy::too_many_arguments)] pub fn new( server_addr: HostAddr, - connector_params: ConnectorParams, config: Arc, worker_id: WorkerNodeId, state_store: StateStoreImpl, @@ -76,7 +71,6 @@ impl StreamEnvironment { ) -> Self { StreamEnvironment { server_addr, - connector_params, config, worker_id, state_store, @@ -93,11 +87,9 @@ impl StreamEnvironment { pub fn for_test() -> Self { use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_dml::dml_manager::DmlManager; - use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_storage::monitor::MonitoredStorageMetrics; StreamEnvironment { server_addr: "127.0.0.1:5688".parse().unwrap(), - connector_params: ConnectorParams::new(SinkPayloadFormat::Json), config: Arc::new(StreamingConfig::default()), worker_id: WorkerNodeId::default(), state_store: StateStoreImpl::shared_in_memory_store(Arc::new( @@ -127,10 +119,6 @@ impl StreamEnvironment { self.state_store.clone() } - pub fn connector_params(&self) -> ConnectorParams { - self.connector_params.clone() - } - pub fn dml_manager_ref(&self) -> DmlManagerRef { self.dml_manager.clone() }