SpringBoot 集成 RabbitMQ 入门使用

添加依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<dependency>

配置信息

属性描述
spring.rabbitmq.addresses 逗号分隔的 RabbitMQ 代理地址列表
spring.rabbitmq.host 代理的主机(默认为 localhost)
spring.rabbitmq.port 代理的端口(默认为 5672)
spring.rabbitmq.username 访问代理所使用的用户名(可选)
spring.rabbitmq.password 访问代理所使用的密码(可选)
spring.rabbitmq.template.exchange 设置默认的 Exchange(可选)
spring.rabbitmq.template.routing-key 设置默认的 Routing-Key(可选)
spring.rabbitmq.template.receive-timeout 设置默认超时时间(可选)

通过 RabbitTemplate 发送消息

// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;

代码示例

@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbit;

@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}

public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}

配置消息转换器

  • Jackson2JsonMessageConverter 使用 Jackson2JSON 实现对象和 JSON 的相互转换
  • MarshallingMessageConverter 使用 Spring 的 Marshaller 和 Unmarshaller 进行转换
  • SerializerMessageConverter 使用 Spring 的 Serializer 和 Deserializer 转换 String 和任意种类的原生对象
  • SimpleMessageConverter 转换 String、字节数组和 Serializable 类型
  • ContentTypeDelegatingMessageConverter 基于 contentType 头信息,讲转换功能委托给另外一个 MessageConverter
  • MessaagingMessageConverter 将消息转换功能委托为另外一个 MessageConverter,并将头信息的转换委托给 AmqpHeaderConverter

设置消息属性

通过 send 方法

public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}

通过 convertAndSend 方法

@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}

接受来自 RabbitMQ 的消息

  • 使用 RabbitTemplate 从队列拉取消息
  • 将消息推送至带有 @RabbitListener 注解的方法

使用 RabbitTemplate 接收消息

@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;

@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}

public Order receiveOrder() {
// 延时等待返回
// Message message = rabbit.receive("tacocloud.orders", 300000);

// 无等待返回
Message message = rabbit.receive("tacocloud.orders");

// 手动转换
// return message != null ? (Order) converter.fromMessage(message) : null;

// 自动转换
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
// 另一种自动转换写法
// return rabbit.receiveAndConvert("tacocloud.order.queue", new ParamterizedTypeReference<Order>() {});
}
}

使用监听器处理 RabbitMQ 的消息

@Component
public class OrderListener {
private KitchenUI ui;

@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}

// 当消息抵达RabbitMQ队列时该方法应该被调用
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}