添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <dependency>
|
配置信息
属性 | 描述 |
---|
spring.rabbitmq.addresses | 逗号分隔的RabbitMQ代理地址列表 |
spring.rabbitmq.host | 代理的主机(默认为localhost) |
spring.rabbitmq.port | 代理的端口(默认为5672) |
spring.rabbitmq.username | 访问代理所使用的用户名(可选) |
spring.rabbitmq.password | 访问代理所使用的密码(可选) |
spring.rabbitmq.template.exchange | 设置默认的Exchange(可选) |
spring.rabbitmq.template.routing-key | 设置默认的Routing-Key(可选) |
spring.rabbitmq.template.receive-timeout | 设置默认超时时间(可选) |
通过RabbitTemplate发送消息
void send(Message message) throws AmqpException; void send(String routingKey, Message message) throws AmqpException; void send(String exchange, String routingKey, Message message) throws AmqpException;
void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException; void convertAndSend(String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor mPP) throws AmqpException;
|
代码示例@Service public class RabbitOrderMessagingService implements OrderMessagingService { private RabbitTemplate rabbit;
@Autowired public RabbitOrderMessagingService(RabbitTemplate rabbit) { this.rabbit = rabbit; }
public void sendOrder(Order order) { MessageConverter converter = rabbit.getMessageConverter(); MessageProperties props = new MessageProperties(); Message message = converter.toMessage(order, props); rabbit.send("tacocloud.order", message); } }
|
配置消息转换器
- Jackson2JsonMessageConverter 使用Jackson2JSON实现对象和JSON的相互转换
- MarshallingMessageConverter 使用Spring的Marshaller和Unmarshaller进行转换
- SerializerMessageConverter 使用Spring的Serializer和Deserializer转换String和任意种类的原生对象
- SimpleMessageConverter 转换String、字节数组和Serializable类型
- ContentTypeDelegatingMessageConverter 基于contentType头信息,讲转换功能委托给另外一个MessageConverter
- MessaagingMessageConverter 将消息转换功能委托为另外一个MessageConverter,并将头信息的转换委托给AmqpHeaderConverter
设置消息属性
通过send
方法public void sendOrder(Order order) { MessageConverter converter = rabbit.getMessageConverter(); MessageProperties props = new MessageProperties(); props.setHeader("X_ORDER_SOURCE", "WEB"); Message message = converter.toMessage(order, props); rabbit.send("tacocloud.order", message); }
|
通过convertAndSend
方法@Override public void sendOrder(Order order) { rabbit.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties props = message.getMessageProperties(); props.setHeader("X_ORDER_SOURCE", "WEB"); return message; } }); }
|
接受来自RabbitMQ的消息
- 使用RabbitTemplate从队列拉取消息
- 将消息推送至带有
@RabbitListener
注解的方法
使用RabbitTemplate接收消息
@Component public class RabbitOrderReceiver { private RabbitTemplate rabbit; private MessageConverter converter;
@Autowired public RabbitOrderReceiver(RabbitTemplate rabbit) { this.rabbit = rabbit; this.converter = rabbit.getMessageConverter(); }
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue"); } }
|
使用监听器处理RabbitMQ的消息
@Component public class OrderListener { private KitchenUI ui;
@Autowired public OrderListener(KitchenUI ui) { this.ui = ui; }
@RabbitListener(queues = "tacocloud.order.queue") public void receiveOrder(Order order) { ui.displayOrder(order); } }
|