diff --git a/extensions/sources/kafka/kafka.go b/extensions/sources/kafka/kafka.go index 6e695b51ac..29dd6b5aec 100644 --- a/extensions/sources/kafka/kafka.go +++ b/extensions/sources/kafka/kafka.go @@ -17,7 +17,9 @@ package main import ( "github.com/lf-edge/ekuiper/contract/v2/api" - "github.com/lf-edge/ekuiper/v2/extensions/impl/sql" + "github.com/lf-edge/ekuiper/v2/extensions/impl/kafka" ) -func Kafka() api.Source { return sql.GetSource() } +func Kafka() api.Source { return kafka.GetSource() } + +func KafkaLookup() api.Source { return kafka.GetSource() } diff --git a/extensions/sources/kafka/kafka.json b/extensions/sources/kafka/kafka.json index ad7a4783a5..526de69d61 100644 --- a/extensions/sources/kafka/kafka.json +++ b/extensions/sources/kafka/kafka.json @@ -8,12 +8,12 @@ "website": "https://www.github.com/carlclone" }, "helpUrl": { - "en_US": "https://ekuiper.org/docs/en/latest/guide/sinks/plugin/kafka.html", - "zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sinks/plugin/kafka.html" + "en_US": "https://ekuiper.org/docs/en/latest/guide/sources/plugin/kafka.html", + "zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sources/plugin/kafka.html" }, "description": { - "en_US": "This a sink for Kafka, it can be used for saving the analysis data into Kafka.", - "zh_CN": "为 Kafka 的持久化插件,可以用于将分析数据存入 Kafka 中" + "en_US": "This a source for Kafka, it can be used for consume Kafka message.", + "zh_CN": "kafka source插件" } }, "libs": [ @@ -36,215 +36,42 @@ } }, { - "name": "topic", - "default": "", + "name": "groupID", + "default": "kuiper-source", "optional": false, "control": "text", "type": "string", "hint": { - "en_US": "The topic to publish to.", - "zh_CN": "订阅主题" - }, - "label": { - "en_US": "Topic", - "zh_CN": "主题" - } - }, - { - "name": "requiredACKs", - "default": "", - "optional": false, - "control": "text", - "type": "int", - "hint": { - "en_US": "The mechanism for Kafka client to confirm messages", - "zh_CN": "Kafka 客户端确认消息的机制" - }, - "label": { - "en_US": "requiredACKs", - "zh_CN": "确认机制" - } - }, - { - "name": "saslAuthType", - "default": "none", - "optional": false, - "control": "select", - "values": [ - "none", - "plain", - "scram" - ], - "type": "string", - "hint": { - "en_US": "Sasl auth type of Kafka", - "zh_CN": "Kafka 的 Sasl 认证类型" - }, - "label": { - "en_US": "Sasl auth type", - "zh_CN": "Sasl 认证类型" - } - }, - { - "name": "saslUserName", - "default": "", - "optional": true, - "control": "text", - "type": "string", - "hint": { - "en_US": "Sasl username for authentication", - "zh_CN": "Sasl 认证使用的用户名" - }, - "label": { - "en_US": "Sasl username", - "zh_CN": "Sasl 用户名" - } - }, - { - "name": "saslPassword", - "default": "", - "optional": true, - "control": "text", - "type": "string", - "hint": { - "en_US": "Sasl password for authentication", - "zh_CN": "Sasl 认证使用的密码" - }, - "label": { - "en_US": "Sasl password", - "zh_CN": "Sasl 密码" - } - }, - { - "name": "certificationPath", - "default": "", - "optional": true, - "connection_related": true, - "control": "text", - "type": "string", - "hint": { - "en_US": "The location of certification path. It can be an absolute path, or a relative path.", - "zh_CN": "证书路径。可以为绝对路径,也可以为相对路径。如果指定的是相对路径,那么父目录为执行 server 命令的路径。" - }, - "label": { - "en_US": "Certification path", - "zh_CN": "证书路径" - } - }, - { - "name": "privateKeyPath", - "default": "", - "optional": true, - "connection_related": true, - "control": "text", - "type": "string", - "hint": { - "en_US": "The location of private key path. It can be an absolute path, or a relative path. ", - "zh_CN": "私钥路径。可以为绝对路径,也可以为相对路径。" - }, - "label": { - "en_US": "Private key path", - "zh_CN": "私钥路径" - } - }, - { - "name": "rootCaPath", - "default": "", - "optional": true, - "connection_related": true, - "control": "text", - "type": "string", - "hint": { - "en_US": "The location of root ca path. It can be an absolute path, or a relative path. ", - "zh_CN": "根证书路径,用以验证服务器证书。可以为绝对路径,也可以为相对路径。" - }, - "label": { - "en_US": "Root CA path", - "zh_CN": "根证书路径" - } - }, - { - "name": "insecureSkipVerify", - "default": true, - "optional": true, - "control": "radio", - "type": "bool", - "hint": { - "en_US": "Control if to skip the certification verification. If it is set to true, then skip certification verification; Otherwise, verify the certification.", - "zh_CN": "控制是否跳过证书认证。如果被设置为 true,那么跳过证书认证;否则进行证书验证。" - }, - "label": { - "en_US": "Skip Certification verification", - "zh_CN": "跳过证书验证" - } - }, - { - "name": "batchSize", - "default": 1, - "optional": true, - "control": "text", - "type": "int", - "hint": { - "en_US": "Limit on how many messages will be buffered before being sent to kafka in one request, default 1", - "zh_CN": "一次性向 kafka 发送批量消息的数量" - }, - "label": { - "en_US": "Batch Size", - "zh_CN": "批量消息发送" - } - }, - { - "name": "maxAttempts", - "default": 1, - "optional": true, - "control": "text", - "type": "int", - "hint": { - "en_US": "Limit on how many attempts will be made to deliver a message, default 1", - "zh_CN": "发送消息失败的重试次数" - }, - "label": { - "en_US": "Max Attempts", - "zh_CN": "消息发送重发次数" - } - }, - { - "name": "headers", - "default": {}, - "optional": true, - "control": "list", - "type": "object", - "hint": { - "en_US": "Message Headers", - "zh_CN": "Headers in Kafka Message" + "en_US": "The groupId of the Kafka consumer", + "zh_CN": "Kafka 消费组名" }, "label": { - "en_US": "Headers for the message", - "zh_CN": "Kafka 消息 Headers" + "en_US": "group id", + "zh_CN": "Kafka 消费组名" } }, { - "name": "key", + "name": "datasource", "default": "", "optional": true, "control": "text", "type": "string", "hint": { - "en_US": "Message Key", - "zh_CN": "Kafka Message Key" + "en_US": "The topic of the Kafka consumer", + "zh_CN": "Kafka 消费topic" }, "label": { - "en_US": "key for the message", - "zh_CN": "Kafka 消息 Key" + "en_US": "topic", + "zh_CN": "Kafka 消费topic" } } ], "node": { - "category": "sink", + "category": "source", "icon": "iconPath", "label": { "en": "Kafka", "zh": "Kafka" } } -} \ No newline at end of file +} diff --git a/extensions/sources/kafka/kafka_test.go b/extensions/sources/kafka/kafka_test.go new file mode 100644 index 0000000000..d7bfd85d8f --- /dev/null +++ b/extensions/sources/kafka/kafka_test.go @@ -0,0 +1,29 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestKafka(t *testing.T) { + kfka := Kafka() + require.NotNil(t, kfka) + + kfkaLookup := KafkaLookup() + require.NotNil(t, kfkaLookup) +}