SpringBoot集成RabbitMQ入门使用

添加依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
<dependency>

配置信息

属性描述
spring.rabbitmq.addresses逗号分隔的RabbitMQ代理地址列表
spring.rabbitmq.host代理的主机(默认为localhost)
spring.rabbitmq.port代理的端口(默认为5672)
spring.rabbitmq.username访问代理所使用的用户名(可选)
spring.rabbitmq.password访问代理所使用的密码(可选)
spring.rabbitmq.template.exchange设置默认的Exchange(可选)
spring.rabbitmq.template.routing-key设置默认的Routing-Key(可选)
spring.rabbitmq.template.receive-timeout设置默认超时时间(可选)

通过RabbitTemplate发送消息

// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;

代码示例

@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
    private RabbitTemplate rabbit;

    @Autowired
    public RabbitOrderMessagingService(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
    }

    public void sendOrder(Order order) {
        MessageConverter converter = rabbit.getMessageConverter();
        MessageProperties props = new MessageProperties();
        Message message = converter.toMessage(order, props);
        rabbit.send("tacocloud.order", message);
    }
}

配置消息转换器

  • Jackson2JsonMessageConverter 使用Jackson2JSON实现对象和JSON的相互转换
  • MarshallingMessageConverter 使用Spring的Marshaller和Unmarshaller进行转换
  • SerializerMessageConverter 使用Spring的Serializer和Deserializer转换String和任意种类的原生对象
  • SimpleMessageConverter 转换String、字节数组和Serializable类型
  • ContentTypeDelegatingMessageConverter 基于contentType头信息,讲转换功能委托给另外一个MessageConverter
  • MessaagingMessageConverter 将消息转换功能委托为另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter

设置消息属性

通过send方法

public void sendOrder(Order order) {
    MessageConverter converter = rabbit.getMessageConverter();
    MessageProperties props = new MessageProperties();
    props.setHeader("X_ORDER_SOURCE", "WEB");
    Message message = converter.toMessage(order, props);
    rabbit.send("tacocloud.order", message);
}

通过convertAndSend方法

@Override
public void sendOrder(Order order) {
    rabbit.convertAndSend("tacocloud.order.queue", order,
        new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties props = message.getMessageProperties();
                props.setHeader("X_ORDER_SOURCE", "WEB");
                return message;
            }
        });
}

接受来自RabbitMQ的消息

  • 使用RabbitTemplate从队列拉取消息
  • 将消息推送至带有@RabbitListener注解的方法

使用RabbitTemplate接收消息

@Component
public class RabbitOrderReceiver {
    private RabbitTemplate rabbit;
    private MessageConverter converter;

    @Autowired
    public RabbitOrderReceiver(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
        this.converter = rabbit.getMessageConverter();
    }

    public Order receiveOrder() {
        // 延时等待返回
        // Message message = rabbit.receive("tacocloud.orders", 300000);

        // 无等待返回
        Message message = rabbit.receive("tacocloud.orders");

        // 手动转换
        // return message != null ? (Order) converter.fromMessage(message) : null;

        // 自动转换
        return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
        // 另一种自动转换写法
        // return rabbit.receiveAndConvert("tacocloud.order.queue", new ParamterizedTypeReference<Order>() {});
    }
}

使用监听器处理RabbitMQ的消息

@Component
public class OrderListener {
    private KitchenUI ui;

    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }

    // 当消息抵达RabbitMQ队列时该方法应该被调用
    @RabbitListener(queues = "tacocloud.order.queue")
    public void receiveOrder(Order order) {
        ui.displayOrder(order);
    }
}