• linkedu视频
  • 平面设计
  • 电脑入门
  • 操作系统
  • 办公应用
  • 电脑硬件
  • 动画设计
  • 3D设计
  • 网页设计
  • CAD设计
  • 影音处理
  • 数据库
  • 程序设计
  • 认证考试
  • 信息管理
  • 信息安全
菜单
linkedu.com
  • 网页制作
  • 数据库
  • 程序设计
  • 操作系统
  • CMS教程
  • 游戏攻略
  • 脚本语言
  • 平面设计
  • 软件教程
  • 网络安全
  • 电脑知识
  • 服务器
  • 视频教程
  • MsSql
  • Mysql
  • oracle
  • MariaDB
  • DB2
  • SQLite
  • PostgreSQL
  • MongoDB
  • Redis
  • Access
  • 数据库其它
  • sybase
  • HBase
您的位置:首页 > 数据库 >Redis > 详解Redis用链表实现消息队列

详解Redis用链表实现消息队列

作者: 字体:[增加 减小] 来源:互联网 时间:2017-05-11

通过本文主要向大家介绍了redis配置文件详解,redis详解,redis info详解,redis命令详解,redis配置详解等相关知识,希望本文的分享对您有所帮助

前言

Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。

链表实现消息队列

Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout, 当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。

用法如下:

charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
//阻塞在这里
/* ---------------------------------------------------- */
//当我在另一个客户端lpush一个元素之后,客户端输出为
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)//阻塞的时间
</div>

当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。

先看下有关数据结构,以及server和client有关属性

//阻塞状态
typedef struct blockingState {
 /* Generic fields. */
 mstime_t timeout;  /* 超时时间 */
 /* REDIS_BLOCK_LIST */
 dict *keys;    /* The keys we are waiting to terminate a blocking
        * operation such as BLPOP. Otherwise NULL. */
 robj *target;   /* The key that should receive the element,
        * for BRPOPLPUSH. */
 /* REDIS_BLOCK_WAIT */
 int numreplicas;  /* Number of replicas we are waiting for ACK. */
 long long reploffset; /* Replication offset to reach. */
} blockingState;
//继续列表
typedef struct readyList {
 redisDb *db;//就绪键所在的数据库
 robj *key;//就绪键
} readyList;
//客户端有关属性
typedef struct redisClient {
 int btype;    /* Type of blocking op if REDIS_BLOCKED. */
 blockingState bpop;  /* blocking state */
}
//服务器有关属性
struct redisServer {
  /* Blocked clients */
 unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
 list *unblocked_clients; /* list of clients to unblock before next loop */
 list *ready_keys;  /* List of readyList structures for BLPOP & co */
}
//数据库有关属性
typedef struct redisDb {
  //keys->redisCLient映射
  dict *blocking_keys;  /* Keys with clients waiting for data (BLPOP) */
 dict *ready_keys;   /* Blocked keys that received a PUSH */
}redisDB
</div>

必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为

void brpopCommand(redisClient *c) {
 blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 mstime_t timeout;
 int j;
 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
  != REDIS_OK) return;//将超时时间保存在timeout中
 for (j = 1; j < c->argc-1; j++) {
  o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表
  if (o != NULL) {//如果不为空
   if (o->type != REDIS_LIST) {//不是链表类型
    addReply(c,shared.wrongtypeerr);//报错
    return;
   } else {
    if (listTypeLength(o) != 0) {//链表不为空
     /* Non empty list, this is like a non normal [LR]POP. */
     char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
     robj *value = listTypePop(o,where);//从链表中pop出一个元素
     redisAssert(value != NULL);
     //给客户端发送pop出来的元素信息
     addReplyMultiBulkLen(c,2);
     addReplyBulk(c,c->argv[j]);
     addReplyBulk(c,value);
     decrRefCount(value);
     notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
          c->argv[j],c->db->id);
     if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表
      dbDelete(c->db,c->argv[j]);
      notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
           c->argv[j],c->db->id);
     }
     /* 省略一部分 */
    }
   }
  }
 }
  /* 如果链表为空,则阻塞客户端 */
  blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
</div>

从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0,但是只能输出第一个有元素的链表。如果list1没有元素,而list2有元素,则输出list2的元素;如果两个都有元素,则输出list1的元素;如果都没有元素,则等待其中某个链表插入一个元素,之后在2返回。最后调用blockForyKeys阻塞

void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
 dictEntry *de;
 list *l;
 int j;
 c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性
 c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空
 if (target != NULL) incrRefCount(target);//不为空,增加引用计数
 for (j = 0; j < numkeys; j++) {
  /* 将阻塞的key存入c.bpop.keys字典中 */
  if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  incrRefCount(keys[j]);
  /* And in the other "side", to map keys -> clients */
  //将阻塞的key和客户端添加进c->db->blocking_keys
  de = dictFind(c->db->blocking_keys,keys[j]);
  if (de == NULL) {
   int retval;
   /* For every key we take a list of clients blocked for it */
   l = listCreate();
   retval = dictAdd(c->db->blocking_keys,keys[j],l);
   incrRefCount(keys[j]);
   redisAssertWithInfo(c,keys[j],retval == DICT_OK);
  } else {
   l = dictGetVal(de);
  }
  listAddNodeTail(l,c);//添加到阻塞键的客户点链表中
 }
 blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志
}
</div>

blockClient函数只是简单的设置客户端属性,如下

void blockClient(redisClient *c, int btype) {
 c->flags |= REDIS_BLOCKED;//设置标志
 c->btype = btype;//阻塞操作类型
 server.bpop_blocked_clients++;
}
</div>

由于这个函数之后,brpop命令执行函数就结束了,由于没有给客户端发送消息,所以客户端就阻塞在read调用中。那么如何解开客户端的阻塞了?

插入一个元素解阻塞

任何指令的执行函数都是在processCommand函数中调用call函数,然后在call函数中调用命令执行函数,lpush也一样。当执行完lpush之后,此时链表不为空,回到processCommand调用中,执行以下语句

if (listLength(server.ready_keys))
   handleClientsBlockedOnLists();
</div>

这两行代码是先检查server.ready_keys是否为空,如果不为空,说明已经有一些就绪的链表,此时可以判断是否有客户端阻塞在这个键值上,如果有,则唤醒;现在问题又来了,这个server.ready_keys在哪更新链表了?

原来是在dbAdd函数中,当往数据库中添加的值类型为REDIS-LIST时,这时就要调用signalListAsReady函数将链表指针添加进server.ready_keys:

//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
 sds copy = sdsdup(key->ptr);
 int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库
 redisAssertWithInfo(NULL,key,retval == REDIS_OK);
 //判
  


 
分享到:QQ空间新浪微博腾讯微博微信百度贴吧QQ好友复制网址打印

您可能想查找下面的文章:

  • Redis教程之代理ip池设计方法详解
  • Redis中的数据过期策略详解
  • 详解在Redis在Centos7上的安装部署
  • 详解用Redis实现Session功能
  • 详解Centos7下配置Redis并开机自启动
  • 详解利用redis + lua解决抢红包高并发的问题
  • 详解Redis用链表实现消息队列
  • 详解Redis中的双链表结构
  • Redis教程(十三):管线详解
  • Redis教程(十):持久化详解

相关文章

  • 2017-11-28Redis的rdb 和aof 持久化的区别
  • 2017-05-11CentOS Linux系统下安装Redis过程和配置参数说明
  • 2017-05-11redis的hGetAll函数的性能问题(记Redis那坑人的HGETALL)
  • 2017-05-11CentOS下Redis数据库的基本安装与配置教程
  • 2017-05-11Redis批量删除KEY的方法
  • 2017-05-11redis中使用redis-dump导出、导入、还原数据实例
  • 2017-05-11详解用Redis实现Session功能
  • 2017-05-11CentOS 6.6下Redis安装配置记录
  • 2017-05-11redis常用命令、常见错误、配置技巧等分享
  • 2017-05-11让Redis在你的系统中发挥更大作用的几点建议

文章分类

  • MsSql
  • Mysql
  • oracle
  • MariaDB
  • DB2
  • SQLite
  • PostgreSQL
  • MongoDB
  • Redis
  • Access
  • 数据库其它
  • sybase
  • HBase

最近更新的内容

    • Linux下安装Redis并设置相关服务
    • 关于redis启动时报错:Could not get a resource from the pool。
    • EasyCMS在幼儿园视频直播项目实战中以redis操作池的方式应对高并发的redis操作问题
    • 将MongoDB作为Redis式的内存数据库的使用方法
    • Redis中5种数据结构的使用场景介绍
    • Linux下Redis安装配置教程
    • Redis String 类型和 Hash 类型学习笔记与总结
    • Redis的使用模式之计数器模式实例
    • redis 自动关闭订单
    • 利用Redis实现SQL伸缩的方法简介

关于我们 - 联系我们 - 免责声明 - 网站地图

©2020-2025 All Rights Reserved. linkedu.com 版权所有