Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka-source paramters and ${} topic #2791

Open
wants to merge 1 commit into
base: release-5.8
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions zh_CN/data-integration/data-bridge-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ EMQX v5.7.2 引入了一项新功能,可以在 SQL 处理阶段将从设置的

4. 如果您想将从 Kafka Source `$bridges/kafka_consumer:<sourceName>` 转换的消息转发到 EMQX,请在 **SQL 编辑器**中输入以下语句。

注意:如果您想指定自己的 SQL 语法,请确保 `SELECT` 部分包含了稍后步骤中设置的消息重发布动作所需的所有字段。
注意:如果您想指定自己的 SQL 语法,请确保 `SELECT` 部分包含了稍后步骤中设置的消息重发布动作所需的所有字段。Kafka Source 的 Select 语句中可以使用 `ts_type`、`topic`、`ts`、`event`、`headers`、`key`、`metadata`、`value`、`timestamp`、`offset`、`node` 等字段。

```sql
SELECT
Expand All @@ -321,7 +321,7 @@ EMQX v5.7.2 引入了一项新功能,可以在 SQL 处理阶段将从设置的
- **Kafka 主题名称**:指定消费者 Source 将订阅的 Kafka 主题,以接收消息。
- **消费组 ID**:指定此 Source 的消费组标识符。如果未提供,系统将基于 Source 名称自动生成一个组 ID。
- **Key 编码模式** 和 **编码模式**:选择 Kafka 消息键和消息值的编码模式。
2. **偏移重置策略**:选择当没有消费者偏移量或偏移量变得无效时,Kafka 消费者开始从 Kafka 主题分区读取的偏移量重置策略。
6. **偏移重置策略**:选择当没有消费者偏移量或偏移量变得无效时,Kafka 消费者开始从 Kafka 主题分区读取的偏移量重置策略。

- 如果您希望消费者从最新偏移量开始读取消息,跳过消费者启动前产生的消息,请选择 `latest`。
- 如果您希望消费者从分区的开始读取消息,包括消费者启动前产生的消息,即读取主题中的所有历史数据,请选择 `earliest`。
Expand All @@ -332,10 +332,12 @@ EMQX v5.7.2 引入了一项新功能,可以在 SQL 处理阶段将从设置的
### 添加一个消息重发布动作

1. 选择**动作输出**选项卡并点击 + **添加动作**按钮来定义规则触发的操作。
1. 从**动作类型**下拉列表中选择**消息重发布**。
2. 在 **主题** 和 **Payload** 字段中,您可以输入您想重新发布的消息的主题和 payload。例如,对于此演示,输入 `t/1` 和 `${.}`。
3. 点击**添加**将此动作包含在规则中。
4. 回到**创建规则**页面,点击页面最下方的**保存**以完成规则创建。
2. 从**动作类型**下拉列表中选择**消息重发布**。
3. 在 **主题** 和 **Payload** 字段中,您可以输入您想重新发布的消息的主题和 payload。例如,对于此演示,输入 `t/1` 和 `${.}`。

- **主题** 中 也可以使用 `${}` 来动态指定 mqtt topic 主题,例如 `t/${key}` (注意:**${}** 中传入的参数,需要包含在 SQL `Select` 语句中)
4. 点击**添加**将此动作包含在规则中。
5. 回到**创建规则**页面,点击页面最下方的**保存**以完成规则创建。

![Kafka_consumer_rule](./assets/Kafka_consumer_rule.png)

Expand Down