Skip to content

Commit

Permalink
fix(kafka): fix kafka source bug
Browse files Browse the repository at this point in the history
Signed-off-by: diaobisong <bitdbsd11@163.com>
  • Loading branch information
dbsd11 authored and ngjaying committed Feb 26, 2025
1 parent efd450f commit 4e96043
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 192 deletions.
6 changes: 4 additions & 2 deletions extensions/sources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package main
import (
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/extensions/impl/sql"
"github.com/lf-edge/ekuiper/v2/extensions/impl/kafka"
)

func Kafka() api.Source { return sql.GetSource() }
func Kafka() api.Source { return kafka.GetSource() }

func KafkaLookup() api.Source { return kafka.GetSource() }
207 changes: 17 additions & 190 deletions extensions/sources/kafka/kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
"website": "https://www.github.com/carlclone"
},
"helpUrl": {
"en_US": "https://ekuiper.org/docs/en/latest/guide/sinks/plugin/kafka.html",
"zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sinks/plugin/kafka.html"
"en_US": "https://ekuiper.org/docs/en/latest/guide/sources/plugin/kafka.html",
"zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sources/plugin/kafka.html"
},
"description": {
"en_US": "This a sink for Kafka, it can be used for saving the analysis data into Kafka.",
"zh_CN": "为 Kafka 的持久化插件,可以用于将分析数据存入 Kafka 中"
"en_US": "This a source for Kafka, it can be used for consume Kafka message.",
"zh_CN": "kafka source插件"
}
},
"libs": [
Expand All @@ -36,215 +36,42 @@
}
},
{
"name": "topic",
"default": "",
"name": "groupID",
"default": "kuiper-source",
"optional": false,
"control": "text",
"type": "string",
"hint": {
"en_US": "The topic to publish to.",
"zh_CN": "订阅主题"
},
"label": {
"en_US": "Topic",
"zh_CN": "主题"
}
},
{
"name": "requiredACKs",
"default": "",
"optional": false,
"control": "text",
"type": "int",
"hint": {
"en_US": "The mechanism for Kafka client to confirm messages",
"zh_CN": "Kafka 客户端确认消息的机制"
},
"label": {
"en_US": "requiredACKs",
"zh_CN": "确认机制"
}
},
{
"name": "saslAuthType",
"default": "none",
"optional": false,
"control": "select",
"values": [
"none",
"plain",
"scram"
],
"type": "string",
"hint": {
"en_US": "Sasl auth type of Kafka",
"zh_CN": "Kafka 的 Sasl 认证类型"
},
"label": {
"en_US": "Sasl auth type",
"zh_CN": "Sasl 认证类型"
}
},
{
"name": "saslUserName",
"default": "",
"optional": true,
"control": "text",
"type": "string",
"hint": {
"en_US": "Sasl username for authentication",
"zh_CN": "Sasl 认证使用的用户名"
},
"label": {
"en_US": "Sasl username",
"zh_CN": "Sasl 用户名"
}
},
{
"name": "saslPassword",
"default": "",
"optional": true,
"control": "text",
"type": "string",
"hint": {
"en_US": "Sasl password for authentication",
"zh_CN": "Sasl 认证使用的密码"
},
"label": {
"en_US": "Sasl password",
"zh_CN": "Sasl 密码"
}
},
{
"name": "certificationPath",
"default": "",
"optional": true,
"connection_related": true,
"control": "text",
"type": "string",
"hint": {
"en_US": "The location of certification path. It can be an absolute path, or a relative path.",
"zh_CN": "证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 server 命令的路径。"
},
"label": {
"en_US": "Certification path",
"zh_CN": "证书路径"
}
},
{
"name": "privateKeyPath",
"default": "",
"optional": true,
"connection_related": true,
"control": "text",
"type": "string",
"hint": {
"en_US": "The location of private key path. It can be an absolute path, or a relative path. ",
"zh_CN": "私钥路径。可以为绝对路径,也可以为相对路径。"
},
"label": {
"en_US": "Private key path",
"zh_CN": "私钥路径"
}
},
{
"name": "rootCaPath",
"default": "",
"optional": true,
"connection_related": true,
"control": "text",
"type": "string",
"hint": {
"en_US": "The location of root ca path. It can be an absolute path, or a relative path. ",
"zh_CN": "根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。"
},
"label": {
"en_US": "Root CA path",
"zh_CN": "根证书路径"
}
},
{
"name": "insecureSkipVerify",
"default": true,
"optional": true,
"control": "radio",
"type": "bool",
"hint": {
"en_US": "Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification.",
"zh_CN": "控制是否跳过证书认证。如果被设置为 true,那么跳过证书认证;否则进行证书验证。"
},
"label": {
"en_US": "Skip Certification verification",
"zh_CN": "跳过证书验证"
}
},
{
"name": "batchSize",
"default": 1,
"optional": true,
"control": "text",
"type": "int",
"hint": {
"en_US": "Limit on how many messages will be buffered before being sent to kafka in one request, default 1",
"zh_CN": "一次性向 kafka 发送批量消息的数量"
},
"label": {
"en_US": "Batch Size",
"zh_CN": "批量消息发送"
}
},
{
"name": "maxAttempts",
"default": 1,
"optional": true,
"control": "text",
"type": "int",
"hint": {
"en_US": "Limit on how many attempts will be made to deliver a message, default 1",
"zh_CN": "发送消息失败的重试次数"
},
"label": {
"en_US": "Max Attempts",
"zh_CN": "消息发送重发次数"
}
},
{
"name": "headers",
"default": {},
"optional": true,
"control": "list",
"type": "object",
"hint": {
"en_US": "Message Headers",
"zh_CN": "Headers in Kafka Message"
"en_US": "The groupId of the Kafka consumer",
"zh_CN": "Kafka 消费组名"
},
"label": {
"en_US": "Headers for the message",
"zh_CN": "Kafka 消息 Headers"
"en_US": "group id",
"zh_CN": "Kafka 消费组名"
}
},
{
"name": "key",
"name": "datasource",
"default": "",
"optional": true,
"control": "text",
"type": "string",
"hint": {
"en_US": "Message Key",
"zh_CN": "Kafka Message Key"
"en_US": "The topic of the Kafka consumer",
"zh_CN": "Kafka 消费topic"
},
"label": {
"en_US": "key for the message",
"zh_CN": "Kafka 消息 Key"
"en_US": "topic",
"zh_CN": "Kafka 消费topic"
}
}
],
"node": {
"category": "sink",
"category": "source",
"icon": "iconPath",
"label": {
"en": "Kafka",
"zh": "Kafka"
}
}
}
}
29 changes: 29 additions & 0 deletions extensions/sources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestKafka(t *testing.T) {
kfka := Kafka()
require.NotNil(t, kfka)

kfkaLookup := KafkaLookup()
require.NotNil(t, kfkaLookup)
}

0 comments on commit 4e96043

Please sign in to comment.