用Redis实现PHP异步队列

最近项目中用到好多用异步队列处理的后台任务,有些心得,记录一下。

下面引用百度百科的对队列的解释:

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。 简单点讲就是:先进先出

要实现一个队列,你可以根据自身服务器资源,可选数据库或者redis或者其他更高级的队列资源来实现。

简单实现,大概代码是这样子的:

class Queue{
    protected $items = [];

    /**
     * 从队列尾部插入数据
     * @param string $key 对列名称
     */
    public function push($key, $value){
        isset($this->items[$key]) or $this->items[$key] = [];
        return array_push($this->items[$key], $value);
    }

    /**
     * 从队列头部取出数据
     * @param string $key 对列名称
     * return mixed $value 数据
     */
    public function pop($key){
        isset($this->items[$key]) or $this->items[$key] = [];
        return array_shift($this->items[$key]);
    }
}

测试:

$queue = new Queue();

var_dump($queue->pop('list_1'));    //输出 NULL

$queue->push('list_1', '1');
$queue->push('list_1', '2');
$queue->push('list_1', '3');

var_dump($queue->pop('list_1'));    //输出 string(1) "1"
var_dump($queue->pop('list_1'));    //输出 string(1) "2"

上面是一个直接存储在数组中的队列,只能在一个php生命周期中使用。我们可以把存储的方式改成别的,例如数据库或者redis什么的。

这里我们使用redis,用到redis的有序列表(list)

class RedisQueue{
    protected $client;  //这里使用了predis这个库连接redis
    public function __construct(\Predis\Client $client)
    {
        $this->client = $client;
    }

    /**
     * 从队列尾部插入数据
     * @param string $key 对列名称
     */
    public function push($key, $value){
        return $this->client->rpush($key, $value);  //从右边入队
    }

    /**
     * 从队列头部取出数据
     * @param string $key 对列名称
     * @return mixed $value 数据
     */
    public function pop($key){
        return $this->client->lpop($key);   //从左边取
    }
}

你还是可以使用上面的例子测试下。

当然了,还有很多情况下,我们的队列要延时出队,我们就要使用redis的另外两种数据类型:有序集合zset和哈希hlist

最终代码如下:

class RedisQueue{

    protected  $client;
    public function __construct(\Predis\Client $client)
    {
        $this->client = $client;
    }

    /**
     * 从队列尾部插入数据
     * @param string $key 对列名称
     * @param int $delay 延迟多少秒
     * @return mixed $value 数据
     */
    public function push($key, $value, $delay = null){
        if(is_null($delay)){    //不使用延迟的时候还是使用以前的rpush入队
            return $this->client->rpush($key, $value);
        }
        $time = time() + $delay;
        if($time > time()){     //还未到该入队的时间时
            $hash_key = md5($this->randString(16).'_'.time().'_'.$value);       //生成一个唯一key
            return $this->client->transaction(function($tx) use ($key, $time, $hash_key, $value){   //使用reids事务
                /** @var \Predis\Client $tx */
                $tx->zadd($key.':zset', [   
                    $hash_key=>$time    //这里以生成的唯一key做 对象member 以time做分数score
                ]);
                $tx->hset($key.':hlist', $hash_key, $value);    //并且保存唯一key和值的映射
                $tx->expire($key.':zset', 7*86400);     //有效期7天
                $tx->expire($key.':hlist', 7*86400);
            });
        }
    }

    /**
     * 取出数据
     * @param string $key 对列名称
     */
    public function receive($key){
        if($this->getLock($key.':lock')){   //使用锁保证线程安全
            $keys = $this->client->zrangebyscore($key.':zset',0, time());   //按当前时间取分数小于等于当前时间的集合元素
            if(!empty($keys)) {
                $message_datas = $this->client->hmget($key.':hlist', $keys);    //通过这些key拿到值
                $this->client->transaction(function($tx) use ($key, $keys, $message_datas){ //继续使用redis事务保证数据完整
                    /** @var \Predis\Client $tx */
                    foreach ($message_datas as $i => $message_data){
                        $tx->rpush($key, $message_data); //这时候才是真正的入队
                        $hash_key = $keys[$i];
                        $tx->hdel($key.':hlist', $hash_key);    //从hash列表删除元素
                        $tx->zrem($key.':zset', $hash_key); //从集合删除元素
                    }
                });
            }
            $this->releaseLock($key.':lock');//释放锁
        }
        return $this->client->lpop($key); //出队
    }

    public function getLock($key){
        $ret = true;
        if($this->client->incr($key) != 1){
            $ret = false;
        }
        $ttl = $this->client->ttl($key);
        if($ttl == -1) {    //forever
            $this->client->expire($key, 60);
        }
        return $ret;
    }

    public function releaseLock($key){
        $this->client->del($key);
    }
    
    protected function randString($length){
        $str = '';
        $strPol = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
        $max = strlen($strPol) - 1;

        for ($i = 0; $i < $length; $i++) {
            $str .= $strPol[mt_rand(0, $max)];
        }
        return $str;
    }
}

主要看上面入队(push)和出队(receive)方法,我代码写了详细的注释。

评论