Skip to content

Commit

Permalink
fix: deprecate SinkPayloadFormat (#16723)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored May 14, 2024
1 parent 76278e9 commit ff49514
Show file tree
Hide file tree
Showing 22 changed files with 68 additions and 479 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/connector-node-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Connector Node Integration Tests

on:
push:
branches: [main]
branches: [ main ]
pull_request:
branches: [main]
branches: [ main ]
merge_group:
types: [checks_requested]
types: [ checks_requested ]

jobs:
build:
Expand Down Expand Up @@ -42,4 +42,8 @@ jobs:
echo "--- build connector node"
cd ${RISINGWAVE_ROOT}/java
# run unit test
mvn --batch-mode --update-snapshots clean package -Dno-build-rust
# WARN: `testOnNext_writeValidation` is skipped because it relies on Rust code to decode message,
# while we don't build Rust code (`-Dno-build-rust`) here to save time
mvn --batch-mode --update-snapshots clean package -Dno-build-rust \
'-Dtest=!com.risingwave.connector.sink.SinkStreamObserverTest#testOnNext_writeValidation' \
-Dsurefire.failIfNoSpecifiedTests=false
6 changes: 3 additions & 3 deletions ci/scripts/connector-node-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ export PYTHONPATH=proto

echo "--- running streamchunk data format integration tests"
cd "${RISINGWAVE_ROOT}"/java/connector-node/python-client
if python3 integration_tests.py --stream_chunk_format_test --input_binary_file="./data/stream_chunk_data" --data_format_use_json=False; then
if python3 integration_tests.py --stream_chunk_format_test --input_binary_file="./data/stream_chunk_data"; then
echo "StreamChunk data format test passed"
else
echo "StreamChunk data format test failed"
exit 1
fi

sink_input_feature=("--input_binary_file=./data/sink_input --data_format_use_json=False")
upsert_sink_input_feature=("--input_binary_file=./data/upsert_sink_input --data_format_use_json=False")
sink_input_feature=("--input_binary_file=./data/sink_input")
upsert_sink_input_feature=("--input_binary_file=./data/upsert_sink_input")
type=("StreamChunk format")

${MC_PATH} mb minio/bucket
Expand Down
3 changes: 2 additions & 1 deletion java/connector-node/python-client/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.vscode
.vscode
sink-client-venv/
21 changes: 2 additions & 19 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def load_stream_chunk_payload(input_file):
return payloads


def test_sink(prop, format, payload_input, table_schema, is_coordinated=False):
def test_sink(prop, payload_input, table_schema, is_coordinated=False):
sink_param = connector_service_pb2.SinkParam(
sink_id=0,
properties=prop,
Expand All @@ -128,7 +128,6 @@ def test_sink(prop, format, payload_input, table_schema, is_coordinated=False):
request_list = [
connector_service_pb2.SinkWriterStreamRequest(
start=connector_service_pb2.SinkWriterStreamRequest.StartSink(
format=format,
sink_param=sink_param,
)
)
Expand Down Expand Up @@ -291,9 +290,6 @@ def test_stream_chunk_data_format(param):
parser.add_argument(
"--deltalake_sink", action="store_true", help="run deltalake sink test"
)
parser.add_argument(
"--input_file", default="./data/sink_input.json", help="input data to run tests"
)
parser.add_argument(
"--input_binary_file",
default="./data/sink_input",
Expand All @@ -302,29 +298,18 @@ def test_stream_chunk_data_format(param):
parser.add_argument(
"--es_sink", action="store_true", help="run elasticsearch sink test"
)
parser.add_argument(
"--data_format_use_json", default=True, help="choose json or streamchunk"
)
args = parser.parse_args()
use_json = args.data_format_use_json == True or args.data_format_use_json == "True"
if use_json:
payload = load_json_payload(args.input_file)
format = connector_service_pb2.SinkPayloadFormat.JSON
else:
payload = load_stream_chunk_payload(args.input_binary_file)
format = connector_service_pb2.SinkPayloadFormat.STREAM_CHUNK
payload = load_stream_chunk_payload(args.input_binary_file)

# stream chunk format
if args.stream_chunk_format_test:
param = {
"format": format,
"payload_input": payload,
"table_schema": make_mock_schema_stream_chunk(),
}
test_stream_chunk_data_format(param)

param = {
"format": format,
"payload_input": payload,
"table_schema": make_mock_schema(),
}
Expand All @@ -337,7 +322,5 @@ def test_stream_chunk_data_format(param):
test_deltalake_sink(param)
if args.es_sink:
test_elasticsearch_sink(param)

# json format
if args.upsert_iceberg_sink:
test_upsert_iceberg_sink(param)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,7 @@ private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink st
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
switch (startSink.getFormat()) {
case FORMAT_UNSPECIFIED:
case UNRECOGNIZED:
throw INVALID_ARGUMENT
.withDescription("should specify payload format in request")
.asRuntimeException();
case JSON:
deserializer = new JsonDeserializer(tableSchema);
break;
case STREAM_CHUNK:
deserializer = new StreamChunkDeserializer(tableSchema);
break;
}
deserializer = new StreamChunkDeserializer(tableSchema);
this.connectorName = connectorName.toUpperCase();
ConnectorNodeMetrics.incActiveSinkConnections(connectorName, "node1");
}
Expand Down
Loading

0 comments on commit ff49514

Please sign in to comment.