最近项目中用到好多用异步队列处理的后台任务,有些心得,记录一下。
下面引用百度百科的对队列的解释:
队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。 简单点讲就是:先进先出
要实现一个队列,你可以根据自身服务器资源,可选数据库或者redis或者其他更高级的队列资源来实现。
简单实现,大概代码是这样子的:
class Queue{
protected $items = [];
/**
* 从队列尾部插入数据
* @param string $key 对列名称
*/
public function push($key, $value){
isset($this->items[$key]) or $this->items[$key] = [];
return array_push($this->items[$key], $value);
}
/**
* 从队列头部取出数据
* @param string $key 对列名称
* return mixed $value 数据
*/
public function pop($key){
isset($this->items[$key]) or $this->items[$key] = [];
return array_shift($this->items[$key]);
}
}
测试:
$queue = new Queue();
var_dump($queue->pop('list_1')); //输出 NULL
$queue->push('list_1', '1');
$queue->push('list_1', '2');
$queue->push('list_1', '3');
var_dump($queue->pop('list_1')); //输出 string(1) "1"
var_dump($queue->pop('list_1')); //输出 string(1) "2"
上面是一个直接存储在数组中的队列,只能在一个php生命周期中使用。我们可以把存储的方式改成别的,例如数据库或者redis什么的。
这里我们使用redis,用到redis的有序列表(list)
class RedisQueue{
protected $client; //这里使用了predis这个库连接redis
public function __construct(\Predis\Client $client)
{
$this->client = $client;
}
/**
* 从队列尾部插入数据
* @param string $key 对列名称
*/
public function push($key, $value){
return $this->client->rpush($key, $value); //从右边入队
}
/**
* 从队列头部取出数据
* @param string $key 对列名称
* @return mixed $value 数据
*/
public function pop($key){
return $this->client->lpop($key); //从左边取
}
}
你还是可以使用上面的例子测试下。
当然了,还有很多情况下,我们的队列要延时出队,我们就要使用redis的另外两种数据类型:有序集合zset和哈希hlist
最终代码如下:
class RedisQueue{
protected $client;
public function __construct(\Predis\Client $client)
{
$this->client = $client;
}
/**
* 从队列尾部插入数据
* @param string $key 对列名称
* @param int $delay 延迟多少秒
* @return mixed $value 数据
*/
public function push($key, $value, $delay = null){
if(is_null($delay)){ //不使用延迟的时候还是使用以前的rpush入队
return $this->client->rpush($key, $value);
}
$time = time() + $delay;
if($time > time()){ //还未到该入队的时间时
$hash_key = md5($this->randString(16).'_'.time().'_'.$value); //生成一个唯一key
return $this->client->transaction(function($tx) use ($key, $time, $hash_key, $value){ //使用reids事务
/** @var \Predis\Client $tx */
$tx->zadd($key.':zset', [
$hash_key=>$time //这里以生成的唯一key做 对象member 以time做分数score
]);
$tx->hset($key.':hlist', $hash_key, $value); //并且保存唯一key和值的映射
$tx->expire($key.':zset', 7*86400); //有效期7天
$tx->expire($key.':hlist', 7*86400);
});
}
}
/**
* 取出数据
* @param string $key 对列名称
*/
public function receive($key){
if($this->getLock($key.':lock')){ //使用锁保证线程安全
$keys = $this->client->zrangebyscore($key.':zset',0, time()); //按当前时间取分数小于等于当前时间的集合元素
if(!empty($keys)) {
$message_datas = $this->client->hmget($key.':hlist', $keys); //通过这些key拿到值
$this->client->transaction(function($tx) use ($key, $keys, $message_datas){ //继续使用redis事务保证数据完整
/** @var \Predis\Client $tx */
foreach ($message_datas as $i => $message_data){
$tx->rpush($key, $message_data); //这时候才是真正的入队
$hash_key = $keys[$i];
$tx->hdel($key.':hlist', $hash_key); //从hash列表删除元素
$tx->zrem($key.':zset', $hash_key); //从集合删除元素
}
});
}
$this->releaseLock($key.':lock');//释放锁
}
return $this->client->lpop($key); //出队
}
public function getLock($key){
$ret = true;
if($this->client->incr($key) != 1){
$ret = false;
}
$ttl = $this->client->ttl($key);
if($ttl == -1) { //forever
$this->client->expire($key, 60);
}
return $ret;
}
public function releaseLock($key){
$this->client->del($key);
}
protected function randString($length){
$str = '';
$strPol = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
$max = strlen($strPol) - 1;
for ($i = 0; $i < $length; $i++) {
$str .= $strPol[mt_rand(0, $max)];
}
return $str;
}
}
主要看上面入队(push)和出队(receive)方法,我代码写了详细的注释。
评论