EMQ X 消息服务器通过模块注册和钩子(Hooks)机制,支持用户开发扩展插件定制服务器认证鉴权与业务功能。
EMQ X 官方提供的插件包括:
插件 | 配置文件 | 说明 |
---|---|---|
emqx_dashboard | etc/plugins/emqx_dashbord.conf | Web 控制台插件(默认加载) |
emqx_management | etc/plugins/emqx_management.conf | HTTP API 与 CLI 管理插件 |
emqx_psk_file | etc/plugins/emqx_psk_file.conf | PSK 支持 |
emqx_web_hook | etc/plugins/emqx_web_hook.conf | Web Hook 插件 |
emqx_lua_hook | etc/plugins/emqx_lua_hook.conf | Lua Hook 插件 |
emqx_retainer | etc/plugins/emqx_retainer.conf | Retain 消息存储模块 |
emqx_rule_engine | etc/plugins/emqx_rule_engine.conf | 规则引擎 |
emqx_bridge_mqtt | etc/plugins/emqx_bridge_mqtt.conf | MQTT 消息桥接 |
emqx_delayed_publish | etc/plugins/emqx_delayed_publish.conf | 客户端延时发布消息支持 |
emqx_coap | etc/plugins/emqx_coap.conf | CoAP 协议支持 |
emqx_lwm2m | etc/plugins/emqx_lwm2m.conf | LwM2M 协议支持 |
emqx_sn | etc/plugins/emqx_sn.conf | MQTT-SN 协议支持 |
emqx_stomp | etc/plugins/emqx_stomp.conf | Stomp 协议支持 |
emqx_recon | etc/plugins/emqx_recon.conf | Recon 性能调试 |
emqx_reloader | etc/plugins/emqx_reloader.conf | Reloader 代码热加载插件 |
emqx_plugin_template | etc/plugins/emqx_plugin_template.conf | 插件开发模版 |
其中插件的加载有四种方式:
- 默认加载
- 命令行启停插件
- 使用 Dashboard 启停插件
- 调用管理 API 启停插件
开启默认加载
如需在系统启动时就默认启动某插件,则直接在 data/loaded_plugins
配置入需要启动的插件,例如默认开启的加载的插件有:
emqx_management.
emqx_rule_engine.
emqx_recon.
emqx_retainer.
emqx_dashboard.
命令行启停插件
在运行过程中,我们可以通过 CLI 命令的方式查看可用的插件列表、和启停某插件:
## 显示所有可用的插件列表
./bin/emqx_ctl plugins list
## 加载某插件
./bin/emqx_ctl plugins load emqx_auth_username
## 卸载某插件
./bin/emqx_ctl plugins unload emqx_auth_username
## 重新加载某插件
./bin/emqx_ctl plugins reload emqx_auth_username
使用 Dashboard 启停插件
如果 EMQ X 开启了 Dashbord 的插件(默认开启) 还可以直接通过访问 http://localhost:18083/plugins
中的插件管理页面启停、或者配置插件。
emqx_dashboard 是 EMQ X 消息服务器的 Web 管理控制台, 该插件默认开启。当 EMQ X 启动成功后,可访问 http://localhost:18083
进行查看,默认用户名/密码: admin/public。
Dashboard 中可查询 EMQ X 消息服务器基本信息、统计数据、负载情况,查询当前客户端列表(Connections)、会话(Sessions)、路由表(Topics)、订阅关系(Subscriptions) 等详细信息。
除此之外,Dashboard 默认提供了一系列的 REST API 供前端调用。其详情可以参考 Dashboard -> HTTP API
部分。
etc/plugins/emqx_dashboard.conf:
## Dashboard 默认用户名/密码
dashboard.default_user.login = admin
dashboard.default_user.password = public
## Dashboard HTTP 服务端口配置
dashboard.listener.http = 18083
dashboard.listener.http.acceptors = 2
dashboard.listener.http.max_clients = 512
## Dashboard HTTPS 服务端口配置
## dashboard.listener.https = 18084
## dashboard.listener.https.acceptors = 2
## dashboard.listener.https.max_clients = 512
## dashboard.listener.https.handshake_timeout = 15s
## dashboard.listener.https.certfile = etc/certs/cert.pem
## dashboard.listener.https.keyfile = etc/certs/key.pem
## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
## dashboard.listener.https.verify = verify_peer
## dashboard.listener.https.fail_if_no_peer_cert = true
emqx_management 是 EMQ X 消息服务器的 HTTP API 与 CLI 管理插件,该插件默认开启。当 EMQ X 启动成功后,用户即可通过该插件提供的 HTTP API 与 CLI 进行查询当前客户端列表等操作,详见 :ref:`rest_api` 与 :ref:`commands`。
etc/plugins/emqx_management.conf:
## 最多返回多少条数据,用于分页机制
management.max_row_limit = 10000
## 默认的应用 secret
# management.application.default_secret = public
## Management HTTP 服务器端口配置
management.listener.http = 8080
management.listener.http.acceptors = 2
management.listener.http.max_clients = 512
management.listener.http.backlog = 512
management.listener.http.send_timeout = 15s
management.listener.http.send_timeout_close = on
## Management HTTPS 服务器端口配置
## management.listener.https = 8081
## management.listener.https.acceptors = 2
## management.listener.https.max_clients = 512
## management.listener.https.backlog = 512
## management.listener.https.send_timeout = 15s
## management.listener.https.send_timeout_close = on
## management.listener.https.certfile = etc/certs/cert.pem
## management.listener.https.keyfile = etc/certs/key.pem
## management.listener.https.cacertfile = etc/certs/cacert.pem
## management.listener.https.verify = verify_peer
## management.listener.https.fail_if_no_peer_cert = true
emqx_psk_file 插件主要提供了 PSK 支持。其目的是用于在客户端建立 TLS/DTLS 连接时,通过 PSK 方式实现 连接认证 的功能。
etc/plugins/emqx_psk_file.conf:
psk.file.path = etc/psk.txt
emqx_web_hook 插件可以将所有 EMQ X 的事件及消息都发送到指定的 HTTP 服务器。
etc/plugins/emqx_web_hook.conf:
## 回调的 Web Server 地址
web.hook.api.url = http://127.0.0.1:8080
## 编码 Payload 字段
## 枚举值: undefined | base64 | base62
## 默认值: undefined (不进行编码)
## web.hook.encode_payload = base64
## 消息、事件配置
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
web.hook.rule.session.created.1 = {"action": "on_session_created"}
web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
web.hook.rule.message.deliver.1 = {"action": "on_message_deliver"}
web.hook.rule.message.acked.1 = {"action": "on_message_acked"}
emqx_lua_hook 插件将所有的事件和消息都发送到指定的 Lua 函数上。其具体使用参见其 README。
emqx_retainer 该插件设置为默认启动,为 EMQ X 提供 Retained 类型的消息支持。它会将所有主题的 Retained 消息存储在集群的数据库中,并待有客户端订阅该主题的时候将该消息投递出去。
etc/plugins/emqx_retainer.conf:
## retained 消息存储方式
## - ram: 仅内存
## - disc: 内存和磁盘
## - disc_only: 仅磁盘
retainer.storage_type = ram
## 最大存储数 (0表示未限制)
retainer.max_retained_messages = 0
## 单条最大可存储消息大小
retainer.max_payload_size = 1MB
## 过期时间, 0 表示永不过期
## 单位: h 小时; m 分钟; s 秒。如 60m 表示 60 分钟
retainer.expiry_interval = 0
桥接 的概念是 EMQ X 支持将自身某类主题的消息通过某种方式转发到另一个 MQTT Broker。
桥接 与 集群 的不同在于:桥接不会复制主题树与路由表,只根据桥接规则转发 MQTT 消息。
目前 MQTT 消息插件支持的桥接方式有:
- RPC 桥接:RPC 桥接只能在 EMQ X Broker 间使用,且不支持订阅远程节点的主题去同步数据
- MQTT 桥接:MQTT 桥接同时支持转发和通过订阅主题来实现数据同步两种方式
在 EMQ X 中,通过修改 etc/plugins/emqx_bridge_mqtt.conf
来配置 bridge。EMQ X 根据不同的 name 来区分不同的 bridge。例如:
## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接 bridge.mqtt.aws.address = 127.0.0.1:1883
该项配置声明了一个名为 aws
的 bridge 并指定以 MQTT 的方式桥接到 127.0.0.1:1883
这台 MQTT 服务器
在需要创建多个 bridge 时,可以先复制其全部的配置项,在通过使用不同的 name 来标示(比如 bridge.mqtt.$name.address 其中 $name 指代的为 bridge 的名称)
etc/plugins/emqx_bridge_mqtt.conf
## 桥接地址: 使用节点名(nodename@host)则用于 RPC 桥接,使用 host:port 用于 MQTT 连接
bridge.mqtt.aws.address = 192.168.1.2:1883
## 桥接的协议版本
## 枚举值: mqttv3 | mqttv4 | mqttv5
bridge.mqtt.aws.proto_ver = mqttv4
## 客户端的 clientid
bridge.mqtt.aws.clientid = bridge_emq
## 客户端的 clean_start 字段
## 注: 有些 MQTT Broker 需要将 clean_start 值设成 `true`
bridge.mqtt.aws.clean_start = true
## 客户端的 username 字段
bridge.mqtt.aws.username = user
## 客户端的 password 字段
bridge.mqtt.aws.password = passwd
## 客户端是否使用 ssl 来连接远程服务器
bridge.mqtt.aws.ssl = off
## 客户端 SSL 连接的 CA 证书 (PEM格式)
bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem
## 客户端 SSL 连接的 SSL 证书
bridge.mqtt.aws.certfile = etc/certs/client-cert.pem
## 客户端 SSL 连接的密钥文件
bridge.mqtt.aws.keyfile = etc/certs/client-key.pem
## SSL 加密方式
bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## TLS PSK 的加密套件
## 注意 'listener.ssl.external.ciphers' 和 'listener.ssl.external.psk_ciphers' 不能同时配置
##
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## 客户端的心跳间隔
bridge.mqtt.aws.keepalive = 60s
## 支持的 TLS 版本
bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## 需要被转发的消息的主题
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
## 挂载点(mountpoint)
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
## 订阅对端的主题
bridge.mqtt.aws.subscription.1.topic = cmd/topic1
## 订阅对端主题的 QoS
bridge.mqtt.aws.subscription.1.qos = 1
## 桥接的重连间隔
## 默认: 30秒
bridge.mqtt.aws.reconnect_interval = 30s
## QoS1/QoS2 消息的重传间隔
bridge.mqtt.aws.retry_interval = 20s
## Inflight 大小.
bridge.mqtt.aws.max_inflight_batches = 32
## emqx_bridge 内部用于 batch 的消息数量
bridge.mqtt.aws.queue.batch_count_limit = 32
## emqx_bridge 内部用于 batch 的消息字节数
bridge.mqtt.aws.queue.batch_bytes_limit = 1000MB
## 放置 replayq 队列的路径,如果没有在配置中指定该项,那么 replayq
## 将会以 `mem-only` 的模式运行,消息不会缓存到磁盘上。
bridge.mqtt.aws.queue.replayq_dir = data/emqx_aws_bridge/
## Replayq 数据段大小
bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB
emqx_delayed_publish 提供了延迟发送消息的功能。当客户端使用特殊主题前缀 $delayed/<seconds>/
发布消息到 EMQ X 时,EMQ X 将在 <seconds>
秒后发布该主题消息。
emqx_coap 提供对 CoAP 协议(RFC 7252)的支持。
etc/plugins/emqx_coap.conf:
coap.port = 5683
coap.keepalive = 120s
coap.enable_stats = off
若开启以下配置,则可以支持 DTLS:
## DTLS 监听端口
coap.dtls.port = 5684
coap.dtls.keyfile = {{ platform_etc_dir }}/certs/key.pem
coap.dtls.certfile = {{ platform_etc_dir }}/certs/cert.pem
## 双向认证相关
## coap.dtls.verify = verify_peer
## coap.dtls.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## coap.dtls.fail_if_no_peer_cert = false
我们可以通过安装 libcoap 来测试 EMQ X 对 CoAP 协议的支持情况。
yum install libcoap
% coap client publish message
coap-client -m put -e "qos=0&retain=0&message=payload&topic=hello" coap://localhost/mqtt
emqx_lwm2m 提供对 LwM2M 协议的支持。
etc/plugins/emqx_lwm2m.conf:
## LwM2M 监听端口
lwm2m.port = 5683
## Lifetime 限制
lwm2m.lifetime_min = 1s
lwm2m.lifetime_max = 86400s
## Q Mode 模式下 `time window` 长度, 单位秒。
## 超过该 window 的消息都将被缓存
#lwm2m.qmode_time_window = 22
## LwM2M 是否部署在 coaproxy 后
#lwm2m.lb = coaproxy
## 设备上线后,主动 observe 所有的 objects
#lwm2m.auto_observe = off
## client register 成功后主动向 EMQ X 订阅的主题
## 占位符:
## '%e': Endpoint Name
## '%a': IP Address
lwm2m.topics.command = lwm2m/%e/dn/#
## client 应答消息(response) 到 EMQ X 的主题
lwm2m.topics.response = lwm2m/%e/up/resp
## client 通知类消息(noify message) 到 EMQ X 的主题
lwm2m.topics.notify = lwm2m/%e/up/notify
## client 注册类消息(register message) 到 EMQ X 的主题
lwm2m.topics.register = lwm2m/%e/up/resp
# client 更新类消息(update message) 到 EMQ X 的主题
lwm2m.topics.update = lwm2m/%e/up/resp
# Object 定义的 xml 文件位置
lwm2m.xml_dir = etc/lwm2m_xml
同样可以通过以下配置打开 DTLS 支持:
# DTLS 证书配置
lwm2m.certfile = etc/certs/cert.pem
lwm2m.keyfile = etc/certs/key.pem
etc/plugins/emqx_sn.conf:
mqtt.sn.port = 1884
emqx_stomp 提供对 Stomp 协议的支持。支持客户端通过 Stomp 1.0/1.1/1.2 协议连接 EMQ X,发布订阅 MQTT 消息。
Note
Stomp 协议端口: 61613
etc/plugins/emqx_stomp.conf:
stomp.default_user.login = guest
stomp.default_user.passcode = guest
stomp.allow_anonymous = true
stomp.frame.max_headers = 10
stomp.frame.max_header_length = 1024
stomp.frame.max_body_length = 8192
stomp.listener = 61613
stomp.listener.acceptors = 4
stomp.listener.max_clients = 512
emqx_recon 插件集成了 recon 性能调测库,可用于查看当前系统的一些状态信息,例如:
./bin/emqx_ctl recon
recon memory #recon_alloc:memory/2
recon allocated #recon_alloc:memory(allocated_types, current|max)
recon bin_leak #recon:bin_leak(100)
recon node_stats #recon:node_stats(10, 1000)
recon remote_load Mod #recon:remote_load(Mod)
etc/plugins/emqx_recon.conf:
%% Garbage Collection: 10 minutes
recon.gc_interval = 600
emqx_reloader 用于开发调试的代码热升级插件。加载该插件后 EMQ X 会根据配置的时间间隔自动热升级更新代码。
同时,也提供了 CLI 命令来指定 reload 某一个模块:
./bin/emqx_ctl reload <Module>
Note
产品部署环境不建议使用该插件。
etc/plugins/emqx_reloader.conf:
reloader.interval = 60
reloader.logfile = log/reloader.log
emqx_plugin_template 是一个 EMQ X 插件模板,在功能上并无任何意义。
开发者需要自定义插件时,可以查看该插件的代码和结构,以更快地开发一个标准的 EMQ X 插件。插件实际是一个普通的 Erlang Application
,其配置文件为: etc/${PluginName}.config
。
参考 emqx_plugin_template 插件模版创建新的插件项目。
Note
在 <plugin name>_app.erl
文件中必须加上标签 -emqx_plugin(<type>).
以表明这是一个<type> 类型的插件。目前支持的枚举值有: auth | backend | bridge | ?MODULE
认证演示模块 - emqx_auth_demo.erl
-module(emqx_auth_demo).
-export([ init/1
, check/2
, description/0
]).
init(Opts) -> {ok, Opts}.
check(_Credentials = #{clientid := ClientId, username := Username, password := Password}, _State) ->
io:format("Auth Demo: clientId=~p, username=~p, password=~p~n", [ClientId, Username, Password]),
ok.
description() -> "Auth Demo Module".
访问控制演示模块 - emqx_acl_demo.erl
-module(emqx_acl_demo).
-include_lib("emqx/include/emqx.hrl").
%% ACL callbacks
-export([ init/1
, check_acl/5
, reload_acl/1
, description/0
]).
init(Opts) ->
{ok, Opts}.
check_acl({Credentials, PubSub, _NoMatchAction, Topic}, _State) ->
io:format("ACL Demo: ~p ~p ~p~n", [Credentials, PubSub, Topic]),
allow.
reload_acl(_State) ->
ok.
description() -> "ACL Demo Module".
注册认证、访问控制模块 - emqx_plugin_template_app.erl
ok = emqx:hook('client.authenticate', fun emqx_auth_demo:check/2, []),
ok = emqx:hook('client.check_acl', fun emqx_acl_demo:check_acl/5, []).
通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。
emqx_plugin_template.erl:
%% Called when the plugin application start
load(Env) ->
emqx:hook('client.authenticate', fun ?MODULE:on_client_authenticate/2, [Env]),
emqx:hook('client.check_acl', fun ?MODULE:on_client_check_acl/5, [Env]),
emqx:hook('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqx:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
emqx:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
emqx:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqx:hook('session.resumed', fun ?MODULE:on_session_resumed/3, [Env]),
emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqx:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqx:hook('session.terminated', fun ?MODULE:on_session_terminated/3, [Env]),
emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]),
emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, [Env]).
所有可用钩子(Hook)说明:
钩子 | 说明 |
---|---|
client.authenticate | 连接认证 |
client.check_acl | ACL 校验 |
client.connected | 客户端上线 |
client.disconnected | 客户端连接断开 |
client.subscribe | 客户端订阅主题 |
client.unsubscribe | 客户端取消订阅主题 |
session.created | 会话创建 |
session.resumed | 会话恢复 |
session.subscribed | 会话订阅主题后 |
session.unsubscribed | 会话取消订阅主题后 |
session.terminated | 会话终止 |
message.publish | MQTT 消息发布 |
message.deliver | MQTT 消息进行投递 |
message.acked | MQTT 消息回执 |
message.dropped | MQTT 消息丢弃 |
扩展命令行演示模块 - emqx_cli_demo.erl
-module(emqx_cli_demo).
-export([cmd/1]).
cmd(["arg1", "arg2"]) ->
emqx_cli:print("ok");
cmd(_) ->
emqx_cli:usage([{"cmd arg1 arg2", "cmd demo"}]).
注册命令行模块 - emqx_plugin_template_app.erl
ok = emqx_ctl:register_command(cmd, {emqx_cli_demo, cmd}, []),
插件加载后,./bin/emqx_ctl
新增命令行:
./bin/emqx_ctl cmd arg1 arg2
插件自带配置文件放置在 etc/${plugin_name}.conf|config
。EMQ X 支持两种插件配置格式:
Erlang 原生配置文件格式 -
${plugin_name}.config
:[ {plugin_name, [ {key, value} ]} ].
sysctl 的
k = v
通用格式 -${plugin_name}.conf
:plugin_name.key = value
Note
k = v
格式配置需要插件开发者创建 priv/plugin_name.schema
映射文件。
- 解压 emqx-enterprise-rel.zip:
unzip emqx-enterprise-rel.zip
- rebar.config 添加依赖:
{deps,
[ {plugin_name, {git, "url_of_plugin", {tag, "tag_of_plugin"}}}
, ....
....
]
}
- rebar.config 中 relx 段落添加:
{relx,
[...
, ...
, {release, {emqx, git_describe},
[
{plugin_name, load},
]
}
]
}