在上一篇文章中,我们介绍了Gmqtt的基本特性以及钩子函数的基本使用方法。本篇我们来详细介绍Gmqtt的插件机制,以及如何编写插件。

session的生命周期

根据MQTT协议规范,每个客户端连接都会有一个与之对应的session,客户端可以指定该session是否需要持久化。对于一个持久化session,即使客户端离线,broker也会为其保留订阅信息以及与其订阅匹配的消息,当客户端重新上线后,broker会把这些消息投递给客户端。这使得在网络频繁闪断的环境下,也不会丢失消息。Gmqtt提供了钩子函数注入到session的生命周期中,使得插件可以管理session生命周期的变化,涉及生命周期的钩子函数有:

  • OnBasicAuth——收到CONNECT报文后调用,用作基本鉴权。
  • OnEnhancedAuth——增强型鉴权(针对V5协议的支持)
  • OnConnected——客户端连接成功
  • OnSessionCreated——客户端新建session成功
  • OnSessionResumed——客户端从session中恢复
  • OnSessionTerminated——session终止
  • OnClosed——客户端断开

下面我们以基本鉴权为例,图解这些钩子函数在整个session生命周期中的位置。

session建立

Gmqtt插件机制详解 - 图1

session删除

非持久化session会在连接关闭时删除,而对于持久化session,一般会设置一个超时时间,当session超时后,会被删除。

对于V3.1.1协议,客户端通过CONNECT报文的cleanSession字段来控制session是否需要持久化;
而在V5协议中,cleanSession字段更名成为了cleanStart字段,客户端通过设置超时时间来控制期望的session的保留时间。

session的删除过程如下图所示:

Gmqtt插件机制详解 - 图2

主题订阅/取消订阅流程

主题订阅和取消流程,涉及以下几个钩子函数:

  • OnSubscribe——常用钩子函数之一,当收到SUBSCRIBE报文后触发,可用作权限控制,主题改写等功能。
  • OnSubscribed——当成功订阅一个主题后触发。
  • OnUnsubscribe——当收到UNSUBSCRIBE报文时触发。可用作权限控制,主题改写等功能。
  • OnUnsubscribed——当取消订阅一个主题后触发。

订阅流程

Gmqtt插件机制详解 - 图3

取消订阅流程

Gmqtt插件机制详解 - 图4

消息发布流程

消息发布流程涉及以下几个钩子函数:

  • OnMsgArrived——常用钩子函数之一,当收到PUBLISH报文后触发,可用作权限控制,消息改写等功能。
  • OnDelivered——当消息投递到客户端后触发。(这个投递成功是从broker的角度,并不保证客户端一定收到)
  • OnMsgDropped——当消息被丢弃时触发。(可能由于队列满,消息超时等原因)

    Gmqtt插件机制详解 - 图5

wrapper模式

wrapper
英 /ˈræpə(r)/
n. 包装纸,包装材料;宽大长衣,浴衣;<美>(雪茄的)外卷烟叶;(可得到税收优惠等的)捆绑金融商品

wrapper模式,也叫做包装器或装饰器模式,是Go语言中非常流行的一种设计模式,常用于实现各样的middleware中间件。

例如下面这个简易的打印日志HTTP中间件:


type HTTPWrapper func(h http.HandlerFunc) http.HandlerFunc

// LogMiddleware 在请求前后打印日志。
func LogMiddleware(h http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        log.Println("开始处理请求")
        h(w, req)
        log.Println("请求处理完毕")
    }
}
func main() {
    var hdl http.HandlerFunc = func(w http.ResponseWriter, req *http.Request) {
        w.WriteHeader(200)
    }
    http.HandleFunc("/", LogMiddleware(hdl))
    http.ListenAndServe(":8080", nil)
}

以上程序的示例输出:

2020/12/20 15:17:03 开始处理请求
2020/12/20 15:17:03 处理请求
2020/12/20 15:17:03 请求处理完毕

在Gmqtt中,所有的钩子函数都有其对应的wrapper函数。插件需要声明本插件需要使用的wrapper函数,以鉴权插件为例:

https://github.com/DrmagicE/gmqtt/blob/v0.2.2/plugin/auth/hooks.go

// HookWrapper 返回Auth插件需要关心的wrapper函数
func (a *Auth) HookWrapper() server.HookWrapper {
    return server.HookWrapper{
        // Auth 鉴权插件只关心 OnBasicAuthWrapper
        OnBasicAuthWrapper: a.OnBasicAuthWrapper,
    }
}
func (a *Auth) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
    return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
        // 处理前一个插件的OnBasicAuth逻辑
        err = pre(ctx, client, req)
        if err != nil {
            return err
        }
        // ... 处理本插件的鉴权逻辑
    }
}

一个钩子函数可以被多个插件使用。插件利用wrapper模式,对钩子函数进行层层包装,最终将一个包装好的钩子函数注入到Gmqtt对应的挂载点上。例如在上述的代码示例中,Auth插件“包装了”上一个插件的钩子函数(pre server.OnBasicAuth),Auth插件选择先执行前一个插件的钩子函数,如果前一个插件返回失败,那么就直接返回失败,跳过本插件的鉴权逻辑。Auth插件也可以选择先执行本插件的鉴权逻辑:

func (a *Auth) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
    return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
        // ... 处理本插件的鉴权逻辑
        // 如果鉴权失败,则返回拒绝连接

        // 如果校验通过,再根据前一个插件的鉴权结果决定是否允许连接
        return pre(ctx, client, req)
    }
}

可以注意到,在这个例子中Auth掌握着前一个插件对应钩子函数的控制权,Auth可以自由的在前一个插件的OnBasicAuth执行前后注入任何逻辑。也就是说,虽然一个钩子函数可以同时被多个插件所使用,但是他们还是有主次之别的,这跟插件的加载顺序息息相关。

插件的加载顺序
插件的加载顺序受配置文件中plugin_order的控制:

# plugin loading orders
plugin_order:
  - auth
  - prometheus
  - admin

plugin_order保存的是插件的名称,数组顺序表示的就是插件的加载顺序。在Gmqtt中,越先加载的插件拥有越大的控制权。例如我们有A,B,C三个插件,都使用OnBasicAuth钩子函数,他们的加载顺序为A->B->C。 那么,在A插件的wrapper函数OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth中, pre包装了B和C两个插件的OnBasicAuth实现。B插件的wrapper包含了插件C的OnBasicAuth实现,而C中的wrapper只包含一个由Gmqtt指定的默认OnBasicAuth实现,A,B,C对OnBasicAuth的层层包装的关系如下图所示:

Gmqtt插件机制详解 - 图6

借助于wrapper模式,开发者可以通过多个组合多个插件来完成一系列控制。

如何编写插件

只要实现了Gmqtt的server.Plugin接口,就是一个Gmqtt的插件。为了简化插件开发,Gmqtt提供了插件模板生成工具,通过命令行可以快速的生成插件模板,令开发者可以更专注于业务实现。

使用gmqctl命令行工具

安装命令行工具:

$ go install github.com/DrmagicE/gmqtt/cmd/gmqctl

目前,gmqctl还只有生成插件模板这一个功能,可以通过gmqctl gen plugin –help查看基本使用方法:

$ gmqctl gen plugin --help
code generator

Usage:
  gmqctl gen plugin [flags]

Examples:
The following command will generate a code template for the 'awesome' plugin, which makes use of OnBasicAuth and OnSubscribe hook and enables the configuration in ./plugins directory.

gmqctl gen plugin -n awesome -H OnBasicAuth,OnSubscribe -c true -o ./plugins

Flags:
  -c, --config          Whether the plugin needs a configuration.
  -h, --help            help for plugin
  -H, --hooks string    The hooks use by the plugin, multiple hooks are separated by ','
  -n, --name string     The plugin name.
  -o, --output string   The output directory.

我们在Gmqtt的项目根目录下运行:

gmqctl gen plugin -n awesome -H OnBasicAuth,OnSubscribe -c true

上述命令会在plugin目录下生成如下几个文件:

$ tree ./plugin/awesome 
./plugin/awesome
├── awesome.go 
├── config.go  # 编写配置项相关逻辑
└── hooks.go # 编写钩子函数相关逻辑

我们逐个文件来分析,首先是awesome.go:

package awesome

import (
    "go.uber.org/zap"

    "github.com/DrmagicE/gmqtt/config"
    "github.com/DrmagicE/gmqtt/server"
)

var _ server.Plugin = (*Awesome)(nil)

const Name = "awesome"

func init() {
    // 注册本插件的构造函数。
    server.RegisterPlugin(Name, New)
    // 由于我们指定了-c true,表示本插件需要配置项。
    // 这里注册默认的配置项,当配置文件配置缺省时,启用默认配置。
    config.RegisterDefaultPluginConfig(Name, &DefaultConfig)
}

// New 是本插件的构造函数
func New(config config.Config) (server.Plugin, error) {
    panic("implement me")
}

var log *zap.Logger

// 实现Plugin接口的结构体。
type Awesome struct {
}

// Load 由Gmqtt按插件的导入顺序,依次执行。
// Load主要的作用就是把server.Server接口传递给插件。
func (a *Awesome) Load(service server.Server) error {
    log = server.LoggerWithField(zap.String("plugin", Name))
    panic("implement me")
}

// Unload 当broker退出时调用,可以做一些清理操作。
func (a *Awesome) Unload() error {
    panic("implement me")
}

func (a *Awesome) Name() string {
    return Name
}

然后是config.go:

package awesome

// Config is the configuration for the awesome plugin.
type Config struct {
    // 在这里,添加你需要的配置
}

// Validate validates the configuration, and return an error if it is invalid.
func (c *Config) Validate() error {
    // Validate方法用于校验配置是否合法。
    // Gmqtt会在导入配置阶段,执行每个插件的Validate方法,
    // 如果校验失败,则报错并停止启动。
    panic("implement me")
}

// DefaultConfig is the default configuration.
var DefaultConfig = Config{
    // 这里定义默认配置,当配置文件缺省时,使用默认配置。
}

func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
    // Gmqtt使用yaml作为配置文件,为了实现动态插拔,每个插件都要自定义自己的yaml解析逻辑。
    // 具体实现方式,可以参考其他内置插件。
    panic("implement me")
}

最后是hook.go:

package awesome

import (
    "github.com/DrmagicE/gmqtt/server"
)

func (a *Awesome) HookWrapper() server.HookWrapper {
    return server.HookWrapper{
        OnBasicAuthWrapper: a.OnBasicAuthWrapper,
        OnSubscribeWrapper: a.OnSubscribeWrapper,
    }
}

// 在刚才的命令中,我们声明要使用OnBasicAuth和OnSubscribe两个钩子函数,
// gmqctl已自动生成了模板,开发者自行填入业务逻辑即可。
func (a *Awesome) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
    panic("impermanent me")
}

func (a *Awesome) OnSubscribeWrapper(pre server.OnSubscribe) server.OnSubscribe {
    panic("impermanent me")
}

import插件并重新编译

新增的插件需要重新编译才能使用。Gmqtt统一的插件import文件: cmd/gmqttd/plugins.go

package main

import (
    // 在这里import所有的插件(为了调用对应的init方法)
    _ "github.com/DrmagicE/gmqtt/plugin/admin"
    _ "github.com/DrmagicE/gmqtt/plugin/auth"
    _ "github.com/DrmagicE/gmqtt/plugin/prometheus"
    _ "path/to/your/plugin" 
)

修改启动顺序

上文提到,插件的启动顺序收配置文件控制,只有在plugin_order中添加的插件才会被加载:

# plugin loading orders
plugin_order:
  - auth
  - prometheus
  - admin
  - your_plugin

插件配置修改

如果插件声明了使用配置,Gmqtt会从配置文件中为其加载配置,插件的配置存放在配置文件中的plugin.插件名空间下:

plugins:
  prometheus:
    path: "/metrics"
    listen_address: ":8082"
  admin:
    http:
      enable: true
      addr: :8083
    grpc:
      addr: 8084
  auth:
    # Password hash type. (plain | md5 | sha256 | bcrypt)
    # Default to MD5.
    hash: md5
    # The file to store password. Default to $HOME/gmqtt_password.yml
    # password_file:

至此,一个新的插件就开发完成了。大伙可以参考内置插件,以及项目的example目录获取更多示例。

如果你对本项目感兴趣,欢迎start支持,留言交流。