From 7dbd431375259d82336bd02a81facb82c42fb6d7 Mon Sep 17 00:00:00 2001 From: Jianxiang Ran Date: Wed, 20 Oct 2021 16:25:20 +0800 Subject: [PATCH] feat(share connection): move connection.yaml into connections folder Signed-off-by: Jianxiang Ran --- docs/en_US/rules/sources/edgex.md | 36 +++++++------ docs/en_US/rules/sources/mqtt.md | 26 ++++----- docs/zh_CN/rules/sources/edgex.md | 36 +++++++------ docs/zh_CN/rules/sources/mqtt.md | 25 +++++---- etc/{ => connections}/connection.yaml | 25 +++++++-- internal/topo/connection/connect_selector.go | 8 +-- .../topo/connection/connect_selector_test.go | 54 +++++++------------ internal/topo/connection/manager_test.go | 2 +- 8 files changed, 109 insertions(+), 103 deletions(-) rename etc/{ => connections}/connection.yaml (56%) diff --git a/docs/en_US/rules/sources/edgex.md b/docs/en_US/rules/sources/edgex.md index 5899d843bc..12057c4b32 100644 --- a/docs/en_US/rules/sources/edgex.md +++ b/docs/en_US/rules/sources/edgex.md @@ -89,29 +89,31 @@ The port of EdgeX message bus, default value is ``5573``. ### connectionSelector -specify the stream to reuse the connection to EdgeX message bus. The connection profile located in ``connection.yaml``. +specify the stream to reuse the connection to EdgeX message bus. The connection profile located in ``connections/connection.yaml``. ```yaml -mqtt: - mqtt_conf1: #connection key - servers: [tcp://127.0.0.1:1883] - username: ekuiper - password: password - #certificationPath: /var/kuiper/xyz-certificate.pem - #privateKeyPath: /var/kuiper/xyz-private.pem.key - #insecureSkipVerify: false - #protocolVersion: 3 - clientid: ekuiper - mqtt_conf2: #connection key - servers: ["tcp://127.0.0.1:1883"] - edgex: - edgex_conf1: #connection key + redisMsgBus: #connection key protocol: redis server: 127.0.0.1 port: 6379 type: redis + # Below is optional configurations settings for mqtt + # type: mqtt + # optional: + # ClientId: client1 + # Username: user1 + # Password: password + # Qos: 1 + # KeepAlive: 5000 + # Retained: true/false + # ConnectionPayload: + # CertFile: + # KeyFile: + # CertPEMBlock: + # KeyPEMBlock: + # SkipCertVerify: true/false ``` -There is one configuration group for EdgeX message bus in the example, user need use ``edgex.edgex_conf1`` as the selector. +There is one configuration group for EdgeX message bus in the example, user need use ``edgex.redisMsgBus`` as the selector. For example ```yaml #Global Edgex configurations @@ -119,7 +121,7 @@ default: protocol: tcp server: localhost port: 5573 - connectionSelector: edgex.edgex_conf1 + connectionSelector: edgex.redisMsgBus topic: events messageType: event # optional: diff --git a/docs/en_US/rules/sources/mqtt.md b/docs/en_US/rules/sources/mqtt.md index 5273f5734a..ffa5cae1f3 100644 --- a/docs/en_US/rules/sources/mqtt.md +++ b/docs/en_US/rules/sources/mqtt.md @@ -51,10 +51,10 @@ The location of private key path. It can be an absolute path, or a relative path ### connectionSelector -specify the stream to reuse the connection to mqtt broker. The connection profile located in ``connection.yaml``. +specify the stream to reuse the connection to mqtt broker. The connection profile located in ``connections/connection.yaml``. ```yaml mqtt: - mqtt_conf1: #connection key + localConnection: #connection key servers: [tcp://127.0.0.1:1883] username: ekuiper password: password @@ -63,17 +63,17 @@ mqtt: #insecureSkipVerify: false #protocolVersion: 3 clientid: ekuiper - mqtt_conf2: #connection key - servers: ["tcp://127.0.0.1:1883"] - -edgex: - edgex_conf1: #connection key - protocol: redis - server: 127.0.0.1 - port: 6379 - type: redis + cloudConnection: #connection key + servers: ["tcp://broker.emqx.io:1883"] + username: user1 + password: password + #certificationPath: /var/kuiper/xyz-certificate.pem + #privateKeyPath: /var/kuiper/xyz-private.pem.ke + #insecureSkipVerify: false + #protocolVersion: 3 + ``` -There are two configuration groups for mqtt in the example, user need use ``mqtt.mqtt_conf1`` or ``mqtt.mqtt_conf2`` as the selector. +There are two configuration groups for mqtt in the example, user need use ``mqtt.localConnection`` or ``mqtt.cloudConnection`` as the selector. For example ```yaml #Global MQTT configurations @@ -84,7 +84,7 @@ default: #password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.key - connectionSelector: mqtt.mqtt_conf1 + connectionSelector: mqtt.localConnection ``` *Note*: once specify the connectionSelector in specific configuration group , all connection related parameters will be ignored , in this case ``servers: [tcp://127.0.0.1:1883]`` diff --git a/docs/zh_CN/rules/sources/edgex.md b/docs/zh_CN/rules/sources/edgex.md index fafb8d000a..6c88bb6a6d 100644 --- a/docs/zh_CN/rules/sources/edgex.md +++ b/docs/zh_CN/rules/sources/edgex.md @@ -89,36 +89,38 @@ EdgeX 消息总线的端口,缺省为 `5573` ## connectionSelector -复用 EdgeX 源连接。连接配置信息位于 ``connection.yaml``. +复用 EdgeX 源连接。连接配置信息位于 ``connections/connection.yaml``. ```yaml -mqtt: - mqtt_conf1: #connection key - servers: [tcp://127.0.0.1:1883] - username: ekuiper - password: password - #certificationPath: /var/kuiper/xyz-certificate.pem - #privateKeyPath: /var/kuiper/xyz-private.pem.key - #insecureSkipVerify: false - #protocolVersion: 3 - clientid: ekuiper - mqtt_conf2: #connection key - servers: ["tcp://127.0.0.1:1883"] - edgex: - edgex_conf1: #connection key + redisMsgBus: #connection key protocol: redis server: 127.0.0.1 port: 6379 type: redis + # Below is optional configurations settings for mqtt + # type: mqtt + # optional: + # ClientId: client1 + # Username: user1 + # Password: password + # Qos: 1 + # KeepAlive: 5000 + # Retained: true/false + # ConnectionPayload: + # CertFile: + # KeyFile: + # CertPEMBlock: + # KeyPEMBlock: + # SkipCertVerify: true/false ``` -对于 EdgeX 连接,这里有一个配置组。用户应该使用 ``edgex.edgex_conf1`` 来作为参数。举例如下: +对于 EdgeX 连接,这里有一个配置组。用户应该使用 ``edgex.redisMsgBus`` 来作为参数。举例如下: ```yaml #Global Edgex configurations default: protocol: tcp server: localhost port: 5573 - connectionSelector: edgex.edgex_conf1 + connectionSelector: edgex.redisMsgBus topic: events messageType: event ``` diff --git a/docs/zh_CN/rules/sources/mqtt.md b/docs/zh_CN/rules/sources/mqtt.md index 4c859f3ec3..7f7a7c510f 100755 --- a/docs/zh_CN/rules/sources/mqtt.md +++ b/docs/zh_CN/rules/sources/mqtt.md @@ -51,10 +51,10 @@ MQTT 连接密码。如果指定了 `certificationPath` 或者 `privateKeyPath` ### connectionSelector -复用 MQTT 源连接。连接配置信息位于 ``connection.yaml``. +复用 MQTT 源连接。连接配置信息位于 ``connections/connection.yaml``. ```yaml mqtt: - mqtt_conf1: #connection key + localConnection: #connection key servers: [tcp://127.0.0.1:1883] username: ekuiper password: password @@ -63,17 +63,16 @@ mqtt: #insecureSkipVerify: false #protocolVersion: 3 clientid: ekuiper - mqtt_conf2: #connection key - servers: ["tcp://127.0.0.1:1883"] - -edgex: - edgex_conf1: #connection key - protocol: redis - server: 127.0.0.1 - port: 6379 - type: redis + cloudConnection: #connection key + servers: ["tcp://broker.emqx.io:1883"] + username: user1 + password: password + #certificationPath: /var/kuiper/xyz-certificate.pem + #privateKeyPath: /var/kuiper/xyz-private.pem.ke + #insecureSkipVerify: false + #protocolVersion: 3 ``` -对于 MQTT 连接,这里有两个配置组。用户应该使用 ``mqtt.mqtt_conf1`` 或者 ``mqtt.mqtt_conf2`` 来作为参数。举例如下: +对于 MQTT 连接,这里有两个配置组。用户应该使用 ``mqtt.localConnection`` 或者 ``mqtt.cloudConnection`` 来作为参数。举例如下: ```yaml #Global MQTT configurations default: @@ -83,7 +82,7 @@ default: #password: password #certificationPath: /var/kuiper/xyz-certificate.pem #privateKeyPath: /var/kuiper/xyz-private.pem.key - connectionSelector: mqtt.mqtt_conf + connectionSelector: mqtt.localConnection ``` *注意*: 相应配置组一旦指定 connectionSelector 参数,所有关于连接的参数都会被忽略. 上面例子中,`` servers: [tcp://127.0.0.1:1883]`` 会被忽略。 diff --git a/etc/connection.yaml b/etc/connections/connection.yaml similarity index 56% rename from etc/connection.yaml rename to etc/connections/connection.yaml index a1fbcd30fa..fd4663a2d8 100644 --- a/etc/connection.yaml +++ b/etc/connections/connection.yaml @@ -1,5 +1,5 @@ mqtt: - mqtt_conf1: #connection key + localConnection: #connection key servers: [tcp://127.0.0.1:1883] username: ekuiper password: password @@ -8,11 +8,18 @@ mqtt: #insecureSkipVerify: false #protocolVersion: 3 clientid: ekuiper - mqtt_conf2: #connection key - servers: ["tcp://127.0.0.1:1883"] + cloudConnection: #connection key + servers: ["tcp://broker.emqx.io:1883"] + username: user1 + password: password + #certificationPath: /var/kuiper/xyz-certificate.pem + #privateKeyPath: /var/kuiper/xyz-private.pem.ke + #insecureSkipVerify: false + #protocolVersion: 3 + edgex: - edgex_conf1: #connection key + redisMsgBus: #connection key protocol: redis server: 127.0.0.1 port: 6379 @@ -31,4 +38,12 @@ edgex: # KeyFile: # CertPEMBlock: # KeyPEMBlock: - # SkipCertVerify: true/false \ No newline at end of file + # SkipCertVerify: true/false + mqttMsgBus: #connection key + protocol: tcp + server: 127.0.0.1 + port: 1883 + topic: events + type: mqtt + optional: + ClientId: "client1" \ No newline at end of file diff --git a/internal/topo/connection/connect_selector.go b/internal/topo/connection/connect_selector.go index e71286a55c..af30d9b0be 100644 --- a/internal/topo/connection/connect_selector.go +++ b/internal/topo/connection/connect_selector.go @@ -8,6 +8,8 @@ import ( var SUPPORTE_CONTYPE = []string{"mqtt", "edgex"} +const CONNECTION_CONF = "connections/connection.yaml" + type ConSelector struct { ConnSelectorCfg string @@ -24,8 +26,8 @@ func (c *ConSelector) Init() error { if len(conTypeSel) != 2 { return fmt.Errorf("not a valid connection selector : %s", c.ConnSelectorCfg) } - c.Type = conTypeSel[0] - c.CfgKey = conTypeSel[1] + c.Type = strings.ToLower(conTypeSel[0]) + c.CfgKey = strings.ToLower(conTypeSel[1]) return nil } @@ -36,7 +38,7 @@ func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error ) cfg := make(map[string]interface{}) - err = conf.LoadConfigByName("connection.yaml", &cfg) + err = conf.LoadConfigByName(CONNECTION_CONF, &cfg) if err != nil { return nil, err } diff --git a/internal/topo/connection/connect_selector_test.go b/internal/topo/connection/connect_selector_test.go index 5d083de41a..18df703e30 100644 --- a/internal/topo/connection/connect_selector_test.go +++ b/internal/topo/connection/connect_selector_test.go @@ -8,7 +8,6 @@ import ( func Test_getConnectionConf(t *testing.T) { type args struct { - connectionType string connectionSelector string } tests := []struct { @@ -18,10 +17,9 @@ func Test_getConnectionConf(t *testing.T) { wantErr bool }{ { - name: "mqtt:mqtt_conf1", + name: "mqtt:localConnection", args: args{ - connectionType: "mqtt", - connectionSelector: "mqtt_conf1", + connectionSelector: "mqtt.localConnection", }, want: map[string]interface{}{ "servers": []interface{}{"tcp://127.0.0.1:1883"}, @@ -32,37 +30,28 @@ func Test_getConnectionConf(t *testing.T) { wantErr: false, }, { - name: "mqtt:mqtt_conf2", + name: "mqtt:cloudConnection", args: args{ - connectionType: "mqtt", - connectionSelector: "mqtt_conf2", + connectionSelector: "mqtt.cloudConnection", }, want: map[string]interface{}{ - "servers": []interface{}{"tcp://127.0.0.1:1883"}, + "servers": []interface{}{"tcp://broker.emqx.io:1883"}, + "username": "user1", + "password": "password", }, wantErr: false, }, { name: "mqtt:mqtt_conf3 not exist", args: args{ - connectionType: "mqtt", - connectionSelector: "mqtt_conf3", - }, - wantErr: true, - }, - { - name: "mqtts:mqtt_conf3 not exist", - args: args{ - connectionType: "mqtts", - connectionSelector: "mqtt_conf3", + connectionSelector: "mqtt.mqtt_conf3", }, wantErr: true, }, { - name: "edgex:edgex_conf1", + name: "edgex:redisMsgBus", args: args{ - connectionType: "edgex", - connectionSelector: "edgex_conf1", + connectionSelector: "edgex.redisMsgBus", }, want: map[string]interface{}{ "protocol": "redis", @@ -76,9 +65,9 @@ func Test_getConnectionConf(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := ConSelector{ - Type: tt.args.connectionType, - CfgKey: tt.args.connectionSelector, + ConnSelectorCfg: tt.args.connectionSelector, } + _ = c.Init() got, err := c.ReadCfgFromYaml() if (err != nil) != tt.wantErr { @@ -93,10 +82,10 @@ func Test_getConnectionConf(t *testing.T) { } func Test_getConnectionConfWithEnv(t *testing.T) { - mqttServerKey := "CONNECTION__MQTT__MQTT_CONF1__SERVERS" + mqttServerKey := "CONNECTION__MQTT__LOCALCONNECTION__SERVERS" mqttServerValue := "[tcp://broker.emqx.io:1883]" - edgexPortKey := "CONNECTION__EDGEX__EDGEX_CONF1__PORT" + edgexPortKey := "CONNECTION__EDGEX__REDISMSGBUS__PORT" edgexPortValue := "6666" err := os.Setenv(mqttServerKey, mqttServerValue) @@ -109,7 +98,6 @@ func Test_getConnectionConfWithEnv(t *testing.T) { } type args struct { - connectionType string connectionSelector string } tests := []struct { @@ -119,10 +107,9 @@ func Test_getConnectionConfWithEnv(t *testing.T) { wantErr bool }{ { - name: "mqtt:mqtt_conf1", + name: "mqtt:localConnection", args: args{ - connectionType: "mqtt", - connectionSelector: "mqtt_conf1", + connectionSelector: "mqtt.localConnection", }, want: map[string]interface{}{ "servers": []interface{}{"tcp://broker.emqx.io:1883"}, @@ -133,10 +120,9 @@ func Test_getConnectionConfWithEnv(t *testing.T) { wantErr: false, }, { - name: "edgex:edgex_conf1", + name: "edgex:redisMsgBus", args: args{ - connectionType: "edgex", - connectionSelector: "edgex_conf1", + connectionSelector: "edgex.redisMsgBus", }, want: map[string]interface{}{ "protocol": "redis", @@ -150,9 +136,9 @@ func Test_getConnectionConfWithEnv(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := ConSelector{ - Type: tt.args.connectionType, - CfgKey: tt.args.connectionSelector, + ConnSelectorCfg: tt.args.connectionSelector, } + _ = c.Init() got, err := c.ReadCfgFromYaml() if (err != nil) != tt.wantErr { diff --git a/internal/topo/connection/manager_test.go b/internal/topo/connection/manager_test.go index 9400b43553..ca38443bf9 100644 --- a/internal/topo/connection/manager_test.go +++ b/internal/topo/connection/manager_test.go @@ -37,7 +37,7 @@ func TestManager(t *testing.T) { return &MockClient{selector: super} }) - conSelector := "mqtt.mqtt_conf1" + conSelector := "mqtt.localConnection" connection, err := GetConnection(conSelector) if err != nil {