SpringBoot集成RabbitMQ入门使用

添加依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<dependency>

配置信息

属性描述
spring.rabbitmq.addresses逗号分隔的RabbitMQ代理地址列表
spring.rabbitmq.host代理的主机(默认为localhost)
spring.rabbitmq.port代理的端口(默认为5672)
spring.rabbitmq.username访问代理所使用的用户名(可选)
spring.rabbitmq.password访问代理所使用的密码(可选)
spring.rabbitmq.template.exchange设置默认的Exchange(可选)
spring.rabbitmq.template.routing-key设置默认的Routing-Key(可选)
spring.rabbitmq.template.receive-timeout设置默认超时时间(可选)

通过RabbitTemplate发送消息

// 发送原始的消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 发送根据对象转换而成的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// 发送根据对象转换而成的消息并且带有后期处理的功能
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;

代码示例

@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbit;

@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}

public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}

配置消息转换器

  • Jackson2JsonMessageConverter 使用Jackson2JSON实现对象和JSON的相互转换
  • MarshallingMessageConverter 使用Spring的Marshaller和Unmarshaller进行转换
  • SerializerMessageConverter 使用Spring的Serializer和Deserializer转换String和任意种类的原生对象
  • SimpleMessageConverter 转换String、字节数组和Serializable类型
  • ContentTypeDelegatingMessageConverter 基于contentType头信息,讲转换功能委托给另外一个MessageConverter
  • MessaagingMessageConverter 将消息转换功能委托为另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter

设置消息属性

通过send方法

public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}

通过convertAndSend方法

@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}

接受来自RabbitMQ的消息

  • 使用RabbitTemplate从队列拉取消息
  • 将消息推送至带有@RabbitListener注解的方法

使用RabbitTemplate接收消息

@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;

@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}

public Order receiveOrder() {
// 延时等待返回
// Message message = rabbit.receive("tacocloud.orders", 300000);

// 无等待返回
Message message = rabbit.receive("tacocloud.orders");

// 手动转换
// return message != null ? (Order) converter.fromMessage(message) : null;

// 自动转换
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
// 另一种自动转换写法
// return rabbit.receiveAndConvert("tacocloud.order.queue", new ParamterizedTypeReference<Order>() {});
}
}

使用监听器处理RabbitMQ的消息

@Component
public class OrderListener {
private KitchenUI ui;

@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}

// 当消息抵达RabbitMQ队列时该方法应该被调用
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}