Concurrent queue processing system implemented by Swoole and Redis

Concurrent queue processing system implemented by Swoole and Redis

Since PHP does not support multi-threading, as a complete system, many operations need to be completed asynchronously. In order to complete these asynchronous operations, we have built a Redis-based queue task system.

As we all know, a message queue processing system is mainly divided into two parts: consumers and producers.

In our system, the main system acts as a producer and the task system acts as a consumer.

The specific workflow is as follows:

1. The main system pushes the name of the task that needs to be processed + task parameters to the queue.

2. The task system pops the task queue in real time, and when a task pops out, it forks a child process, and the child process completes the specific task logic.

The specific code is as follows:

/**
 *  
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}

/**
 *  
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//
        $pid = posix_getpid();
       //echo "* Process {$pid} was created/n\n";
        $this->mq_process();
        exit;
    } else {//
        $pid = pcntl_wait($status, WUNTRACED);//
        if (pcntl_wifexited($status)) {
           //echo "\n\n* Sub process: {$pid} exited with {$status}";
           //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}

/**
 *  
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}
 

This is a simple task processing system.

This task system has helped us achieve asynchrony, and it has been running stably for nearly a year so far.

Unfortunately, it is a single-process system. It is a constant fork. If there is a task, it will be processed, and if there is no task, it will be skipped.

This is very stable.

But there are two problems: one is that constantly fork and pop will waste server resources, and the other is that it does not support concurrency!

The first problem is fine, but the second problem is very serious.

When the main system throws a large number of tasks at the same time, the processing time of the tasks will be infinitely lengthened.

New design

In order to solve the concurrency problem, we plan to build a more efficient and strong team processing system.

Because multi-threading is not supported before PHP7, we use multi-process.

I found a lot of information from the Internet, and most of the so-called multi-processes are N processes running in the background at the same time.

Obviously this is inappropriate.

My expectation is: every time a task is popped, a task will be fork, and the child process will end after the task is executed.

Problems encountered

1. How to control the maximum number of processes

The problem is simple, that is, it increments every time a child process forks. When the child process is completed, it will decrement once.

There is no problem with self-increment, and we are done in the main process. So how do you reduce yourself?

You might say, of course it is in the child process. But here you need to pay attention: when fork, a resource is copied from the main process to the child process, which means that you cannot operate the counter in the main process in the child process!

Therefore, there is a knowledge point that needs to be understood: signal.

Specifically, you can Google by yourself, and look at the code directly here.

//install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));
 

This installs a signal processor. Of course there is still one point missing.

declare(ticks = 1);
 

Declare is a control structure statement, please go to Google for specific usage.

The meaning of this code is to call the signal processor every time a low-level statement is executed.

In this way, the signal handler will be called every time the child process ends, and we can decrement the signal handler.

2. How to solve process residue

In multi-process development, if it is not handled properly, the process will remain.

In order to solve the process residue, the child process must be recycled.

So how to recycle the child process is a technical point.

In pcntl's demo, including many blog posts, it is said that the child process is recycled in the main process.

But we are based on Redis's brpop, and brpop is blocking.

This leads to a problem: when N tasks are executed, the main process is blocked when the task system is idle, and the child process is still executing when the blockage occurs, so the process recovery of the last few child processes cannot be completed. . .

It has always been tangled here, but when I got the signal processor, it became very simple.

Process recovery is also placed in the signal processor.

Evaluation of the new system

pcntl is an extension of process processing, but unfortunately its support for multiple processes is very weak.

Therefore, the Process in the Swoole extension is used here.

The specific code is as follows:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{

    use Trait_Redis;

    private $maxProcesses = 800;
    private $child;
    private $masterRedis;
    private $redis_task_wing = 'task:wing';//

    public function init(){
       //install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        ini_set('default_socket_timeout', -1);//, redis :read error on connection
    }

    private function redis_client(){
        $rds = new Redis();
        $rds->connect('redis.master.host',6379);
        return $rds;
    }

    public function process(swoole_process $worker){// 
        $GLOBALS['worker'] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS['worker'];
            $recv = $worker->read();           //send data to master

            sleep(rand(1, 3));
            echo "From Master: $recv\n";
            $worker->exit(0);
        });
        exit;
    }

    public function testAction(){
        for ($i = 0; $i < 10000; $i++){
            $data = [
                'abc' => $i,
                'timestamp' => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }

    public function runAction(){
        while (1){
//           echo "\t now we de have $this->child child processes\n";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $rds->brpop($this->redis_task_wing, 3);//, 
                if (!$data_pop){
                    continue;
                }
                echo "\t Starting new child | now we de have $this->child child processes\n";
                $this->child++;
                $process = new swoole_process([$this, 'process']);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }

    private function sig_handler($signo) {
//       echo "Recive: $signo/r\n";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
//                   echo "PID={$ret['pid']}\n";
                    $this->child--;
                }
        }
    }
}
 

In the end, after testing, a single-core 1G server can perform a task of 1 to 3 seconds to achieve 800 concurrency.