【swoole.2.04】多进程示例:使用swoole实现多进程处理费时任务--弹性伸缩子程序

回顾

上一篇解决了回复僵尸进程的问题,但是在现实环境中,昂贵的服务器不会允许我们一直起多个worker进行处理,这里来解决一下怎么弹性伸缩worker。这里以消息队列中的待消费消息为例,当然也可以以其他指标为例,如cpu利用率,内存利用率等。

代码

class Job
{
    protected $masterPid;
    protected $workerNumber = 2;
    protected $maxWorkerNumber = 10;
    protected $queueKey = 1;
    protected $queueMod = 2 | \Swoole\Process::IPC_NOWAIT;//  异步非阻塞通信
    protected $table = null;
    protected $manegerProcess = null;

    public function __construct()
    {
        //  创建table内存保存子进程pid信息
        $this->table = new \Swoole\Table(1024);
        //  增加一列用来保存是否空闲
        $this->table->column('resting', \Swoole\Table::TYPE_INT);
        $this->table->create();
        //  获取主进程pid
        $this->masterPid = getmypid();
        //  创建初始worker
        for ($i = 0; $i < $this->workerNumber; $i++) {
            $process = $this->worker();
            var_dump("子进程开启:" . $process->pid);
        }
        //  maneger进程
        $this->manegerProcess = $this->maneger();
        var_dump("maneger进程开启:" . $process->pid);
        //  回收
        $this->wait();
    }

    public function worker()
    {
        $process = new \Swoole\Process(function (\Swoole\Process $process) {
            while (true) {
                $msg = $process->pop();
                if ($msg) {
                    //  设置该子进程为工作中
                    $this->table->set($process->pid, ['resting' => 0]);
                    sleep(1);   //  处理耗时任务的操作
                    var_dump(sprintf('%s完成了一个任务:%s,当前共有%s个子进程,剩余%s个任务', $process->pid, $msg, $this->table->count(), $process->statQueue()['queue_num']));
                } else {
                    //  设置该子进程为空闲
                    $this->table->set($process->pid, ['resting' => 1]);
                }
            }
        });
        $process->useQueue($this->queueKey, $this->queueMod);
        $process->start();
        return $process;
    }

    public function maneger()
    {
        $process = new \Swoole\Process(function (\Swoole\Process $process) {
            var_dump($process->pid . 'maneger开始');
            while (true) {
                //  不定时增加消息
                sleep(1);
                if (rand(0, 10) < 8) {
                    for ($i = 0; $i < 20; $i++) {
                        $process->push(1);
                    }
                    var_dump('投递了10个任务)');
                }
                if ($process->statQueue()['queue_num'] > 100) {
                    sleep(15);
                }
            }
        });
        $process->useQueue($this->queueKey, $this->queueMod);
        $process->start();

        return $process;
    }

    public function wait()
    {
        //  信号监听
        \Swoole\Process::signal(SIGCHLD, function ($sig) {
            while ($res = \Swoole\Process::wait(false)) {
                var_dump("回收子进程", $res);
            }
        });

        \Swoole\Timer::tick(1000, function () {
            //  监控子进程数量,弹性增加子进程
            //  当当前消息队列中的数据大于某个值,并且子程序没有超过上限的时候,创建子程序加速消费
            if ($this->manegerProcess->statQueue()['queue_num'] > $this->maxWorkerNumber * 2
                && $this->table->count() <= $this->maxWorkerNumber
            ) {
                /**
                 * 不能直接在定时器中使用process
                 * process无法在协程环境中使用
                 * swoole在默认配置下是开启协程环境的,可以通过enable_coroutine配置来关闭
                 * 关于默认开启协程环境的地方:
                 * https://wiki.swoole.com/wiki/page/949.html
                 */
//                $this->worker();
                \Swoole\Process::kill($this->masterPid, SIGRTMAX);
            }
        });

        \Swoole\Process::signal(SIGRTMAX, function ($sig) {
            $process = $this->worker();
            var_dump("子进程开启:" . $process->pid);
        });

        \Swoole\Timer::tick(5000, function () {
            var_dump("开始回收检测");
            //  消息队列中没有消息的时候可以消费了就开始清除空闲
            if ($this->manegerProcess->statQueue()['queue_num'] == 0) {
                foreach ($this->table as $k => $v) {
                    //  仅保留必要队列
                    if ($v['resting'] && $this->table->count() > $this->workerNumber) {
                        var_dump("清除空闲进程:" . $k);
                        \Swoole\Process::kill($k);
                        $this->table->del($k);
                    }
                }
            }
        });
    }
}

new Job();

代码分析

实现思路

在之前的生产消费模型上增加了一层用来控制消费者数量和回收消费者进程的程序作为master,而原来的生产者作为一个(或一组)子程序作为maneger,worker不变。通过一个统一的消息队列由maneger来做任务投递,master对程序运行状况做出评估并进行弹性的伸缩。

如何共享变量

因为多进程中各自进程之间变量是隔离的,所以需要使用第三方来存储共享数据,如代码中各个进程的运行状态,是否空闲。可以使用memcache,redis等消息中间件,也可以使用mysql保存,这里我选择了swoole自带的内存操作模块 table。因为table是基于本地内存共享架构,比起其他第三方进行交互通信的工具更快速,更小巧,具体使用方法可以查看文档,这里不赘述了。

其他变动

主进程在启动后会做以下监控

  1. 回收正常结束的进程
  2. 利用定时器监测程序运行状态,在有需要的情况下增加worker来保证消费速度
  3. 利用定时器监测消息队列中的任务数量,在合理的时候回收不必要的worker进程,减少系统开销

注意事项

需要注意的是swoole并不支持在协程环境进行创建程序的操作,所以并不能在定时器中进行进程的创建。此时可以选择一个未被使用的信号对信号进行监控,主进程在监控到信号时在回调事件中创建进程即可。

有的人可能会说,我并没有开启协程啊,swoole中协程不是用go(function())来创建的吗?

swoole中是有默认的协程环境的,具体可以阅读此篇文档 enable_coroutine

在未特殊配置的情况下swoole在部分功能中是默认开启协程环境的,其中就包括Timer::tick

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论