接下来我们就需要给Zinx添加消息队列和多任务Worker机制了。
我们可以通过worker的数量来限定处理业务的固定goroutine数量,而不是无限制的开辟Goroutine,虽然我们知道go的调度算法已经做的很极致了,但是大数量的Goroutine依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。
我们可以用消息队列来缓冲worker工作的数据。

初步我们的设计结构如下图:
八、Zinx的消息队列及多任务机制 - 图1

8.1 创建 消息队列

首先,处理消息队列的部分,我们应该集成到 MsgHandler模块下,因为属于我们消息管理模块范畴内:

zinx/znet/msghandler.go

type MsgHandle struct {
    Apis           map[uint32]ziface.IRouter  //存放每个MsgId 所对应的处理方法的map属性
    WorkerPoolSize uint32                     //业务工作Worker池的数量

    //新增:消息队列 管道
    TaskQueue      []chan ziface.IRequest     //Worker负责取任务的消息队列
}

func NewMsgHandle() *MsgHandle {
    return &MsgHandle{
        Apis: make(map[uint32]ziface.IRouter),
        WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
        //一个worker对应一个queue消息队列管道
        TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize), 
    }
}

这里添加两个成员:
WokerPoolSize:作为工作池的数量,因为TaskQueue中的每个队列应该是和一个Worker对应的,所以我们在创建TaskQueue中队列数量要和Worker的数量一致。
TaskQueue:是一个Request请求信息的channel集合。用来缓冲提供worker调用的Request请求信息,worker会从对应的队列中获取客户端的请求数据并且处理掉
当然WorkerPoolSize最好也可以从GlobalObject获取,并且zinx.json配置文件可以手动配置。

zinx/utils/globalobj.go

/*
    存储一切有关Zinx框架的全局参数,供其他模块使用
    一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {
    /*
        Server
    */
    TcpServer ziface.IServer //当前Zinx的全局Server对象
    Host      string         //当前服务器主机IP
    TcpPort   int            //当前服务器主机监听端口号
    Name      string         //当前服务器名称

    /*
        Zinx
    */
    Version          string //当前Zinx版本号
    MaxPacketSize    uint32 //都需数据包的最大值
    MaxConn          int    //当前服务器主机允许的最大链接个数

    WorkerPoolSize   uint32 //业务工作Worker池的数量
    MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量

    /*
        config file path
    */
    ConfFilePath string
}

//...
//...

/*
    提供init方法,默认加载
*/
func init() {
    //初始化GlobalObject变量,设置一些默认值
    GlobalObject = &GlobalObj{
        Name:          "ZinxServerApp",
        Version:       "V0.4",
        TcpPort:       7777,
        Host:          "0.0.0.0",
        MaxConn:       12000,
        MaxPacketSize: 4096,
        ConfFilePath:  "conf/zinx.json",

        WorkerPoolSize: 10,
        MaxWorkerTaskLen: 1024,
    }

    //从配置文件中加载一些用户配置的参数
    GlobalObject.Reload()
}

8.2 创建及启动Worker工作池

现在添加Worker工作池,先定义一些启动工作池的接口

zinx/ziface/imsghandler.go

/*
    消息管理抽象层
 */
type IMsgHandle interface{
    DoMsgHandler(request IRequest)            //马上以非阻塞方式处理消息
    AddRouter(msgId uint32, router IRouter)    //为消息添加具体的处理逻辑

    //新增:
    StartWorkerPool()                        //启动worker工作池
    SendMsgToTaskQueue(request IRequest)    //将消息交给TaskQueue管道,传递给对应的worker进行处理
}

zinx/znet/msghandler.go

//启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
    fmt.Println("Worker ID = ", workerID, " is started.")

    //不断的取队列中的消息
    for {
        select {
            //有消息的话,则取出队列的Request,并执行request绑定的业务方法
            case request := <-taskQueue:
                mh.DoMsgHandler(request)
        }
    }
}

//启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
    //遍历需要启动worker的数量,依此启动
    for i:= 0; i < int(mh.WorkerPoolSize); i++ {
        //一个worker被启动
        //给当前worker对应的任务队列开辟空间
        mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)

        //调用上面的方法:
        //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
        go mh.StartOneWorker(i, mh.TaskQueue[i])
    }
}

StartWorkerPool()方法是启动Worker工作池,这里根据用户配置好的WorkerPoolSize的数量来启动,然后分别给每个Worker分配一个TaskQueue,然后用一个goroutine来承载一个Worker的工作业务。

StartOneWorker()方法就是一个Worker的工作业务,每个worker是不会退出的(目前没有设定worker的停止工作机制),会永久的从对应的TaskQueue中等待消息,并处理。

8.3 发送消息 给 消息队列

现在,worker工作池已经准备就绪了,那么就需要有一个给到worker工作池消息的入口,我们再定义一个方法

zinx/ziface/imsghandler.go

//将 request消息 交给 TaskQueue ,由 worker进行处理
func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) {
    //根据ConnID来分配当前的连接应该由哪个worker负责处理
    //轮询的平均分配法则

    //得到需要处理此条连接的workerID
    workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
    fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)

    //将请求消息发送给任务队列
    mh.TaskQueue[workerID] <- request
}

SendMsgToTaskQueue()作为工作池的数据入口,这里面采用的是轮询的分配机制,因为不同链接信息都会调用这个入口,那么到底应该由哪个worker处理该链接的请求处理,这里用的是一个简单的求模运算。用余数和workerID的匹配来进行分配。
最终将request请求数据发送给对应worker的TaskQueue,那么对应的worker的Goroutine就会处理该链接请求了。

8.4 Zinx-V0.8代码实现

好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()方法中,在服务端Accept之前,启动Worker工作池。

zinx/znet/server.go

//开启网络服务
func (s *Server) Start() {

    //...

    //开启一个go去做服务端Linster业务
    go func() {
        //0 启动worker工作池机制
        s.msgHandler.StartWorkerPool()

        //1 获取一个TCP的Addr
        addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
        if err != nil {
            fmt.Println("resolve tcp addr err: ", err)
            return
        }

        //...
        //...

        }
    }()
}

其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。

所以应该在 Connection 的 StartReader()方法 中修改:

zinx/znet/connection.go

/*
    读消息Goroutine,用于从客户端中读取数据
 */
func (c *Connection) StartReader() {
    fmt.Println("Reader Goroutine is  running")
    defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
    defer c.Stop()

    for  {
        //创建拆包解包的对象...

        //读取客户端的Msg head...

        //拆包,得到msgid 和 datalen 放在msg中...

        //根据 dataLen 读取 data,放在msg.Data中...

        //得到当前客户端请求的Request数据
        req := Request{
            conn:c,
            msg:msg,
        }

        //已经启动工作池机制,将消息交给Worker处理
        if utils.GlobalObject.WorkerPoolSize > 0 {
            c.MsgHandler.SendMsgToTaskQueue(&req)
        } else { //如果没有配置工作池,就直接去执行
            //从绑定好的消息和对应的处理方法中直接去执行对应的Handle方法
            go c.MsgHandler.DoMsgHandler(&req)
        }
    }
}

这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。

8.5 使用Zinx-V0.8完成应用程序

测试代码和V0.6、V0.7的代码一样。因为Zinx框架对外接口没有发生改变。
我们分别启动Server、Client

$go run Server.go
$go run Client0.go
$go run Client1.go
$go run Client0.go

结果:
服务端:

$ go run Server.go 
Add api msgId =  0
Add api msgId =  1
[START] Server name: zinx v-0.8 demoApp,listenner at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
Worker ID =  4  is started.
start Zinx server   zinx v-0.8 demoApp  succ, now listenning...
Worker ID =  9  is started.
Worker ID =  0  is started.
Worker ID =  5  is started.
Worker ID =  6  is started.
Worker ID =  1  is started.
Worker ID =  2  is started.
Worker ID =  7  is started.
Worker ID =  8  is started.
Worker ID =  3  is started.
Reader Goroutine is  running
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Reader Goroutine is  running
Add ConnID= 1  request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Reader Goroutine is  running
Add ConnID= 2  request msgID= 0 to workerID= 2
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 1  request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 2  request msgID= 0 to workerID= 2
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 1  request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message

客户端0

$ go run Client0.go 
Client Test ... start
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping

客户端1

$ go run Client1.go 
Client Test ... start
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8

客户端2

$ go run Client0.go 
Client Test ... start
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping