找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 1934|回复: 0
打印 上一主题 下一主题

php的memcache类分享(memcache队列)

[复制链接]

2617

主题

2617

帖子

7789

积分

论坛元老

Rank: 8Rank: 8

积分
7789
跳转到指定楼层
楼主
发表于 2018-2-14 05:59:28 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

            memcacheQueue.class.php
[U]复制代码[/U] 代码如下:
add('1asdf');
* $obj->getQueueLength();
* $obj->read(10);
* $obj->get(8);
*/
class memcacheQueue{
public static $client;   //memcache客户端连接
public   $access;   //队列是否可更新
private   $expire;   //过期时间,秒,1~2592000,即30天内
private   $sleepTime;   //等待解锁时间,微秒
private   $queueName;   //队列名称,唯一值
private   $retryNum;   //重试次数,= 10 * 理论并发数
public   $currentHead;  //当前队首值
public   $currentTail;  //当前队尾值
const MAXNUM  = 20000;    //最大队列数,建议上限10K
const HEAD_KEY = '_lkkQueueHead_';  //队列首kye
const TAIL_KEY = '_lkkQueueTail_';  //队列尾key
const VALU_KEY = '_lkkQueueValu_';  //队列值key
const LOCK_KEY = '_lkkQueueLock_';  //队列锁key
    /**
     * 构造函数
     * @param string $queueName  队列名称
     * @param int $expire 过期时间
     * @param array $config  memcache配置
     *
     * @return
     */
public function __construct($queueName ='',$expire=0,$config =''){
  if(empty($config)){
   self:client = memcache_pconnect('127.0.0.1',11211);
  }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
   self:client = memcache_pconnect($config['host'],$config['port']);
  }elseif(is_string($config)){//"127.0.0.1:11211"
   $tmp = explode(':',$config);
   $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
   $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
   self:client = memcache_pconnect($conf['host'],$conf['port']);
  }
  if(!self:client) return false;
  ignore_user_abort(true);//当客户断开连接,允许继续执行
  set_time_limit(0);//取消脚本执行延时上限
  $this->access = false;
  $this->sleepTime = 1000;
  $expire = empty($expire) ? 3600 : intval($expire)+1;
  $this->expire = $expire;
  $this->queueName = $queueName;
  $this->retryNum = 1000;
  $this->head_key = $this->queueName . self::HEAD_KEY;
  $this->tail_key = $this->queueName . self::TAIL_KEY;
  $this->lock_key = $this->queueName . self:OCK_KEY;
  $this->_initSetHeadNTail();
}
    /**
     * 初始化设置队列首尾值
     */
private function _initSetHeadNTail(){
  //当前队列首的数值
  $this->currentHead = memcache_get(self:client, $this->head_key);
  if($this->currentHead === false) $this->currentHead =0;
  //当前队列尾的数值
  $this->currentTail = memcache_get(self:client, $this->tail_key);
  if($this->currentTail === false) $this->currentTail =0;
}
    /**
     * 当取出元素时,改变队列首的数值
  * @param int $step 步长值
     */
private function _changeHead($step=1){
  $this->currentHead += $step;
  memcache_set(self:client, $this->head_key,$this->currentHead,false,$this->expire);
}
    /**
     * 当添加元素时,改变队列尾的数值
  * @param int $step 步长值
     * @param bool $reverse 是否反向
     * @return null
     */
private function _changeTail($step=1, $reverse =false){
  if(!$reverse){
   $this->currentTail += $step;
  }else{
   $this->currentTail -= $step;
  }
  memcache_set(self:client, $this->tail_key,$this->currentTail,false,$this->expire);
}
    /**
     * 队列是否为空
     * @return bool
     */
private function _isEmpty(){
  return (bool)($this->currentHead === $this->currentTail);
}
    /**
     * 队列是否已满
     * @return bool
     */
private function _isFull(){
  $len = $this->currentTail - $this->currentHead;
  return (bool)($len === self::MAXNUM);
}
    /**
     * 队列加锁
     */
private function _getLock(){
  if($this->access === false){
   while(!memcache_add(self:client, $this->lock_key, 1, false, $this->expire) ){
    usleep($this->sleepTime);
    @$i++;
    if($i > $this->retryNum){//尝试等待N次
     return false;
     break;
    }
   }
   $this->_initSetHeadNTail();
   return $this->access = true;
  }
  return $this->access;
}
    /**
     * 队列解锁
     */
private function _unLock(){
  memcache_delete(self:client, $this->lock_key, 0);
  $this->access = false;
}
    /**
     * 获取当前队列的长度
     * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度_initSetHeadNTail();
  return intval($this->currentTail - $this->currentHead);
}
    /**
     * 添加队列数据
     * @param void $data 要添加的数据
     * @return bool
     */
public function add($data){
  if(!$this->_getLock()) return false;
  if($this->_isFull()){
   $this->_unLock();
   return false;
  }
  $value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail +1);
  $result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
  if($result){
   $this->_changeTail();
  }
  $this->_unLock();
  return $result;
}
    /**
     * 读取队列数据
     * @param int $length 要读取的长度(反向读取使用负数)
     * @return array
     */
public function read($length=0){
  if(!is_numeric($length)) return false;
  $this->_initSetHeadNTail();
  if($this->_isEmpty()){
   return false;
  }
  if(empty($length)) $length = self::MAXNUM;//默认所有
  $keyArr = array();
  if($length >0){//正向读取(从队列首向队列尾)
   $tmpMin = $this->currentHead;
   $tmpMax = $tmpMin + $length;
   for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i;
   }
  }else{//反向读取(从队列尾向队列首)
   $tmpMax = $this->currentTail;
   $tmpMin = $tmpMax + $length;
   for($i= $tmpMax; $i >$tmpMin; $i--){
    $keyArr[] = $this->queueName . self::VALU_KEY . $i;
   }
  }
  $result = @memcache_get(self::$client, $keyArr);
  return $result;
}
    /**
     * 取出队列数据
     * @param int $length 要取出的长度(反向读取使用负数)
     * @return array
     */
public function get($length=0){
  if(!is_numeric($length)) return false;
  if(!$this->_getLock()) return false;
  if($this->_isEmpty()){
   $this->_unLock();
   return false;
  }
  if(empty($length)) $length = self::MAXNUM;//默认所有
  $length = intval($length);
  $keyArr = array();
  if($length >0){//正向读取(从队列首向队列尾)
   $tmpMin = $this->currentHead;
   $tmpMax = $tmpMin + $length;
   for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i;
   }
   $this->_changeHead($length);
  }else{//反向读取(从队列尾向队列首)
   $tmpMax = $this->currentTail;
   $tmpMin = $tmpMax + $length;
   for($i= $tmpMax; $i >$tmpMin; $i--){
    $keyArr[] = $this->queueName . self::VALU_KEY . $i;
   }
   $this->_changeTail(abs($length), true);
  }
  $result = @memcache_get(self::$client, $keyArr);
  foreach($keyArr as $v){//取出之后删除
   @memcache_delete(self::$client, $v, 0);
  }
  $this->_unLock();
  return $result;
}
    /**
     * 清空队列
     */
public function clear(){
  if(!$this->_getLock()) return false;
  if($this->_isEmpty()){
   $this->_unLock();
   return false;
  }
  $tmpMin = $this->currentHead--;
  $tmpMax = $this->currentTail++;
  for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i;
   @memcache_delete(self::$client, $tmpKey, 0);
  }
  $this->currentTail = $this->currentHead = 0;
  memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire);
  memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire);
  $this->_unLock();
}
/*
  * 清除所有memcache缓存数据
  */
public function memFlush(){
  memcache_flush(self::$client);
}
}//end class
            
            
您可能感兴趣的文章:
  • php Memcache 中实现消息队列
  • PHP下操作Linux消息队列完成进程间通信的方法
  • 队列在编程中的实际应用(php)
  • php中使用redis队列操作实例代码
  • PHP+memcache实现消息队列案例分享
  • php实现的双向队列类实例
  • PHP队列用法实例
  • PHP实现的memcache环形队列类实例
  • PHP消息队列用法实例分析
            
  • 分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    用户反馈
    客户端