程序员小董通过本文主要向大家介绍了redis 聊天,redis功能,redis,redis教程,redis是什么等相关知识,希望本文的分享对您有所帮助
本文为大家分享了Redis支持多人多聊天室功能的设计代码,供大家参考,具体内容如下
设计原理
左边的一个数据域,代表两个聊天室,聊天室id分别是827,729
在聊天室827里,有2个人,分别是jason22,jeff24他们分别已经阅读过聊天室内的id为5和6的消息
右边的一个数据域,代表了用户在不同的聊天室,jason22参与了827与729聊天室,在这两个聊天室里,他分别阅读到了id为5和id为10的消息
另外827聊天室内id为5的消息与729聊天室内id为5的消息不一样。
同时还有三个域
msgs:chatid
这是一个zset,有序集合,member是消息体,score是消息id
代表的是某个聊天室内已经发出的消息
另外 这里面存的是有用的消息,已经被所有人都阅读的消息就会被删除
ids:chatid
是一个String型的数据,里面放的是最新的消息的编号(发消息时,自增这个字段,即可获得最新的值)
ids:chat:
是一个String型的数据,里面放的是最新的聊天室的编号(创建聊天室时,自增这个字段)
代码
OK 开始看代码
public String createChat(Jedis conn, String sender, Set<String> recipients, String message) { //启动的时候redis里是没有ids:chat:这个键的 //自增之后返回1 String chatId = String.valueOf(conn.incr("ids:chat:")); return createChat(conn, sender, recipients, message, chatId); } /** * * @param conn * @param sender 发送消息的人 * @param recipients 接受消息的人 * @param message 待发送的消息 * @param chatId 聊天室的编号 * @return */ public String createChat( Jedis conn, String sender, Set<String> recipients, String message, String chatId){ //自己发的消息 自己也能接受到 recipients.add(sender); Transaction trans = conn.multi(); for (String recipient : recipients){ //聊天室的成员 最开始时 都阅读的是0号信息 trans.zadd("chat:" + chatId, 0, recipient); //记录每个人参加的聊天室 trans.zadd("seen:" + recipient, 0, chatId); } trans.exec(); return sendMessage(conn, chatId, sender, message); } public String sendMessage(Jedis conn, String chatId, String sender, String message) { //锁住聊天室 为啥? 人员变动了咋办 //这个acquireLock见上一章 String identifier = acquireLock(conn, "chat:" + chatId); if (identifier == null){ throw new RuntimeException("Couldn't get the lock"); } try { //给要发布的消息设定一个最新的编号 第一次时 返回的是1 long messageId = conn.incr("ids:" + chatId); HashMap<String,Object> values = new HashMap<String,Object>(); values.put("id", messageId); values.put("ts", System.currentTimeMillis()); values.put("sender", sender); values.put("message", message); String packed = new Gson().toJson(values); //某个聊天室的消息列表 //最旧的消息----消息json //默认的zset是按照score的值从小到大排序 conn.zadd("msgs:" + chatId, messageId, packed); }finally{ releaseLock(conn, "chat:" + chatId, identifier); } return chatId; }</div>
发消息现在就OK了,剩下的就是用户去拉取未读的消息了。这个比较麻烦,恩,相当的麻烦
@SuppressWarnings("unchecked") public List<ChatMessages> fetchPendingMessages(Jedis conn, String recipient) { //获得用户在各个聊天室 已经看到的最新消息的id //有几个聊天室 seenSet的size就是几 Set<Tuple> seenSet = conn.zrangeWithScores("seen:" + recipient, 0, -1); List<Tuple> seenList = new ArrayList<Tuple>(seenSet); Transaction trans = conn.multi(); for (Tuple tuple : seenList){ String chatId = tuple.getElement(); int seenId = (int)tuple.getScore(); //获取每个聊天室里 未读的所有消息 //min 和 max 可以是 -inf 和 +inf trans.zrangeByScore("msgs:" + chatId, String.valueOf(seenId + 1), "inf"); } //我参加了几个聊天室 results的长度就是几 List<Object> results = trans.exec(); //com.google.gson.Gson jar包自己下载吧 Gson gson = new Gson(); Iterator<Tuple> seenIterator = seenList.iterator(); Iterator<Object> resultsIterator = results.iterator(); //用户最后成功拉取的未读消息 存放在chatMessages List<ChatMessages> chatMessages = new ArrayList<ChatMessages>(); List<Object[]> seenUpdates = new ArrayList<Object[]>(); List<Object[]> msgRemoves = new ArrayList<Object[]>(); //这个大的while循环 用户参与了几个聊天室 就循环几次 while (seenIterator.hasNext()){ Tuple seen = seenIterator.next(); Set<String> messageStrings = (Set<String>)resultsIterator.next(); if (messageStrings.size() == 0){ //没有未读的消息 continue; } //代码运行到这里 //说明 我在某个聊天室 还有未读的消息 //seedid记录我已经拉取到的消息 初始为0 int seenId = 0; //当前处理的是哪个聊天室 String chatId = seen.getElement(); List<Map<String,Object>> messages = new ArrayList<Map<String,Object>>(); //我在聊天室未读的消息列表 for (String messageJson : messageStrings){ Map<String,Object> message = (Map<String,Object>)gson.fromJson( messageJson, new TypeToken<Map<String,Object>>(){}.getType()); int messageId = ((Double)message.get("id")).intValue(); if (messageId > seenId){ seenId = messageId; } message.put("id", messageId); //加入到成功拉取的列表里 messages.add(message); } //更新我在这个聊天室读到的最新消息 conn.zadd("chat:" + chatId, seenId, recipient); //记录我在某个聊天室读到的最新记录 seenUpdates.add(new Object[]{"seen:" + recipient, seenId, chatId}); //取出第0个member-score Set<Tuple> minIdSet = conn.zrangeWithScores("chat:" + chatId, 0, 0); //为啥删除呢? 每个聊天室是一个zset表 第一条记录代表的就是 所有用户至少都读了的消息 if (minIdSet.size() > 0){ Tuple tuple=minIdSet.iterator().next(); System.out.println("要删除的 tuple:"+tuple.getElement()+"--"+tuple.getScore()); msgRemoves.add(new Object[]{"msgs:" + chatId, tuple.getScore()}); } chatMessages.add(new ChatMessages(chatId, messages)); } trans = conn.multi(); for (Object[] seenUpdate : seenUpdates){ trans.zadd( (String)seenUpdate[0], (Integer)seenUpdate[1], (String)seenUpdate[2]); } for (Object[] msgRemove : msgRemoves){ trans.zremrangeByScore( (String)msgRemove[0], 0, ((Double)msgRemove[1]).intValue()); } trans.exec(); //返回的是我这次拉取获得的 最新的消息 return chatMessages; }</div>
OK,咱们看看测试代码:
package redisinaction; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.junit.BeforeClass; import org.junit.Test; import jedis.redis_in_action.Chapter06; import jedis.redis_in_action.Chapter06.ChatMessages; import redis.clients.jedis.Jedis; import redis.clients.jedis.Tuple; /** * This class is used for ... * @aut