• linkedu视频
  • 平面设计
  • 电脑入门
  • 操作系统
  • 办公应用
  • 电脑硬件
  • 动画设计
  • 3D设计
  • 网页设计
  • CAD设计
  • 影音处理
  • 数据库
  • 程序设计
  • 认证考试
  • 信息管理
  • 信息安全
菜单
linkedu.com
  • 网页制作
  • 数据库
  • 程序设计
  • 操作系统
  • CMS教程
  • 游戏攻略
  • 脚本语言
  • 平面设计
  • 软件教程
  • 网络安全
  • 电脑知识
  • 服务器
  • 视频教程
  • JavaScript
  • ASP.NET
  • PHP
  • 正则表达式
  • AJAX
  • JSP
  • ASP
  • Flex
  • XML
  • 编程技巧
  • Android
  • swift
  • C#教程
  • vb
  • vb.net
  • C语言
  • Java
  • Delphi
  • 易语言
  • vc/mfc
  • 嵌入式开发
  • 游戏开发
  • ios
  • 编程问答
  • 汇编语言
  • 微信小程序
  • 数据结构
  • OpenGL
  • 架构设计
  • qt
  • 微信公众号
您的位置:首页 > 程序设计 >Java > 详解spring boot集成RabbitMQ

详解spring boot集成RabbitMQ

作者:SamHxm 字体:[增加 减小] 来源:互联网 时间:2017-05-28

SamHxm 通过本文主要向大家介绍了spring boot rabbitmq,spring rabbitmq,spring整合rabbitmq,rabbitmq springmvc,spring集成rabbitmq等相关知识,希望对您有所帮助,也希望大家支持linkedu.com www.linkedu.com

RabbitMQ作为AMQP的代表性产品,在项目中大量使用。结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题。

首先正确的安装RabbitMQ及运行正常。

RabbitMQ需啊erlang环境,所以首先安装对应版本的erlang,可在RabbitMQ官网下载

# rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
</div>

使用yum安装RabbitMQ,避免缺少依赖包引起的安装失败

# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
</div>

启动RabbitMQ

# /sbin/service rabbitmq-server start
</div>

由于RabbitMQ默认提供的guest用户只能本地访问,所以额外创建用户用于测试

# /sbin/rabbitmqctl add_user test test123
用户名:test,密码:test123
</div>

开启web管理插件

# rabbitmq-plugins enable rabbitmq_management
</div>

并使用之前创建的用户登录,并设置该用户为administrator,虚拟主机地址为/

spring boot 引入相关依赖

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
</dependencies>
</div>

消息生产者

application.properties添加一下配置

spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
</div>

spring boot配置类,作用为指定队列,交换器类型及绑定操作

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

  //声明队列
  @Bean
  public Queue queue1() {
    return new Queue("hello.queue1", true); // true表示持久化该队列
  }

  @Bean
  public Queue queue2() {
    return new Queue("hello.queue2", true);
  }

  //声明交互器
  @Bean
  TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
  }

  //绑定
  @Bean
  public Binding binding1() {
    return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
  }

  @Bean
  public Binding binding2() {
    return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
  }

}

</div>

共声明了2个队列,分别是hello.queue1,hello.queue2,交换器类型为TopicExchange,并与hello.queue1,hello.queue2队列分别绑定。

生产者类

import java.util.UUID;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @PostConstruct
  public void init() {
    rabbitTemplate.setConfirmCallback(this);
    rabbitTemplate.setReturnCallback(this);
  }

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (ack) { 
      System.out.println("消息发送成功:" + correlationData); 
    } else { 
      System.out.println("消息发送失败:" + cause); 
    } 

  }

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");

  }

  //发送消息,不需要实现任何接口,供外部调用。
  public void send(String msg){

    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

    System.out.println("开始发送消息 : " + msg.toLowerCase());
    String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
    System.out.println("结束发送消息 : " + msg.toLowerCase());
    System.out.println("消费者响应 : " + response + " 消息处理完成");
  }
}

</div>

要点:

1.注入RabbitTemplate

2.实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(非必须)。
ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。

3.实现消息发送方法。调用rabbitTemplate相应的方法即可。rabbitTemplate常用发送方法有

rabbitTemplate.send(message);  //发消息,参数类型为org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。
</div>

针对业务场景选择合适的消息发送方式即可。

消息消费者

application.properties添加一下配置

spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123

spring.rabbitmq.listener.concurrency=2  //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数
</div>

消费者类

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

  @RabbitListener(queues = "hello.queue1")
  public String processMessage1(String msg) {
    System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
    return msg.toUpperCase();
  }

  @RabbitListener(queues = "hello.queue2")
  public void processMessage2(String msg) {
    System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue2队列的消息:" + msg);
  }
}

</div>

由于定义了2个队列,所以分别定义不同的监听器监听不同的队列。由于最小消息监听线程数和最大消息监听线程数都是2,所以每个监听器各有2个线程实现监听功能。

要点:

1.监听器参数类型与消息实际类型匹配。在生产者中发送的消息实际类型是String,所以这里监听器参数类型也是String。

2.如果监听器需要有响应返回给生产者,直接在监听方法中return即可。

运行测试

import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.sam.demo.rabbitmq.Application;
import com.sam.demo.rabbitmq.sender.Sender;

@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitTests {

  @Autowired
  private Sender sender;

  @Test
  public void sendTest() throws Exception {
    while(true){




 
分享到:QQ空间新浪微博腾讯微博微信百度贴吧QQ好友复制网址打印

您可能想查找下面的文章:

  • spring boot整合RabbitMQ实例详解(Fanout模式)
  • Spring Boot整合RabbitMQ实例(Topic模式)
  • spring boot整合RabbitMQ(Direct模式)
  • 详解spring boot集成RabbitMQ
  • spring boot整合RabbitMQ实例详解(Fanout模式)
  • Spring Boot整合RabbitMQ实例(Topic模式)
  • spring boot整合RabbitMQ(Direct模式)

相关文章

  • 2017-05-28java版简单的猜数字游戏实例代码
  • 2017-05-28详解spring security 配置多个AuthenticationProvider
  • 2017-08-30java面试笔试需准备内容
  • 2017-05-28Java中内存异常StackOverflowError与OutOfMemoryError详解
  • 2017-05-28浅谈JavaWeb中的web.xml配置部署描述符文件
  • 2017-05-28java 数据的加密与解密普遍实例代码
  • 2017-05-28Hibernate映射之基本类映射和对象关系映射详解
  • 2017-05-28Mybatis调用MySQL存储过程的简单实现
  • 2017-05-28Java图片处理开源框架Thumbnailator
  • 2017-05-28java 根据坐标截取图片实例代码

文章分类

  • JavaScript
  • ASP.NET
  • PHP
  • 正则表达式
  • AJAX
  • JSP
  • ASP
  • Flex
  • XML
  • 编程技巧
  • Android
  • swift
  • C#教程
  • vb
  • vb.net
  • C语言
  • Java
  • Delphi
  • 易语言
  • vc/mfc
  • 嵌入式开发
  • 游戏开发
  • ios
  • 编程问答
  • 汇编语言
  • 微信小程序
  • 数据结构
  • OpenGL
  • 架构设计
  • qt
  • 微信公众号

最近更新的内容

    • Spring Boot(五)之跨域、自定义查询及分页
    • 跟我学Java Swing之游戏设计(1)
    • SpringMVC结合ajaxfileupload实现文件无刷新上传代码
    • 详解Spring全局异常处理的三种方式
    • 深入jetty的使用详解
    • Java中的FilterOutputStream 简介_动力节点Java学院整理
    • Spring Boot如何使用Spring Security进行安全控制
    • SWT(JFace) 文本编辑器 实现代码
    • 详解Mybatis的二级缓存配置
    • SpringMVC之简单的增删改查示例(SSM整合)

关于我们 - 联系我们 - 免责声明 - 网站地图

©2020-2025 All Rights Reserved. linkedu.com 版权所有