Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。我们以 Rabbit MQ 为消息中间件系统为基础,介绍 Spring Cloud Stream 的使用。
使用 Spring Cloud Stream && RabbitMQ
首先来认识一下 Spring Cloud Stream 中的几个重要概念。
Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
搭建一个stream 应用
在我们的boot项目中引入spring-cloud-starter-stream-rabbit 的配置。
yml 配置如下:
spring: cloud: stream: binders: # 指定binder 名字 magic_rabbit 自己可以更改 magic_rabbit: type: rabbit bindings: # 指定绑定的消息输入 websocketMessageIn: destination: websocketMessage binder: magic_rabbit # 指定绑定的消息输出 websocketMessageOut: destination: websocketMessage binder: magic_rabbit rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
理解配置文件很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎么回事了。
spring.cloud.stream.binders,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 magic_rabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。
spring.cloud.stream.bindings ,对应上面提到到 「Destination Bindings」。这里面可以配置多个 input 或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了input 、output,名称分别为 websocketMessageIn、websocketMessageOut。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。
每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 magic_rabbit 。
可以看到 websocketMessageIn、websocketMessageOut 对应的 destination 是相同的, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。
另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue 就会根据 destination + 随机字符串的方式命名。
如何接收消息
我们使用stream 内置的简单消息通道(消息通道也就是指消息的来源和去向)接口
编写如下接口:
public interface WebSocketMessageStream { String outPut = "websocketMessageOut"; String inPut = "websocketMessageIn"; @Input(inPut) SubscribableChannel input(); @Output(outPut) MessageChannel output(); }
消息发送通道定义,定义了一个 MessageChannel 类型的 output() 方法,用 @Output 注解标示,并指定了 binding 的名称为 websocketMessageOut。
消息接收通道定义,定义了一个 SubscribableChannel 类型的 input() 方法,表示订阅一个消息的方法,并用 @Input 注解标识,并且指定了 binging 的名称为 websocketMessageIn 。
在项目启动类上加上注解 @EnableBinding(value = {WebSocketMessageStream.class}) ,表明启用 stream ,并指定定义的 Channel 定义接口类。
@SpringBootApplication @EnableBinding(WebSocketMessageStream.class) public class StreamApplication { public static void main(String[] args) { SpringApplication.run(StreamApplication.class, args); } }
再提供一个消息处理的服务类
@Component @Slf4j public class WebSocketMessageService { @Autowired private WebSocketMessageStream stream; @StreamListener(WebSocketMessageStream.inPut) public void messageReceived(String message){ log.info("receive message from mq:{}",message); // 监听器负责将消息发送至 wesocket 或者 做其他操作 } public void sendMessage(String message){ log.info("send message to mq:{}",message); Message stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build(); stream.output().send(stringMessage); } }
使用 @StreamListener 注解,表示对消息进行订阅监控,指定 binding 的名称,对应的上面的配置文件,就是 spring.cloud.stream.bindings
启动 Application ,可以在 rabbitmq 管理控制台的 exchanges 中看到增加的 bindings 。
编写 controller 来实现消息发送:
@RestController @RequestMapping("/message") public class MessageController { @Autowired private WebSocketMessageService messageService; /** * 发送消息 * @param message * @return */ @RequestMapping(value = "/send",method = RequestMethod.GET) public String sendMessage(String message){ messageService.sendMessage(message); return "success"; } }
每次发布消息,就能看到控制台的日志。之后结合上篇文章,websocket 通知实践 我们就可以将 websocket 消息结合stream 来进行管理了。每次接收消息后,我们可以将消息通过stream 发给websocket 实现消息与服务的解耦。