消息驱动
Stream
需求:消息中间件很多,希望向上抽象一个接口,我们不关心底层用的是什么消息中间件
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
就像 JDBC 形成一种规范,统一不同数据库的接口
什么是SpringCloud Stream
官方定义SpringCloud Stream是一个构建消息驱动微服务的框架。https://spring.io/projects/spring-cloud-stream#overview
SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念
目前仅支持RabbitMQ、Kafka。
流程:
pub生产者发送消息,BROKER接收消息放到队列中,订阅者接收到消息
选修必须走特定的通道:下嘻嘻通道MessageChannel
消息通道里的消息如何消费呢?谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
比如java里用的是RabbitMQ,大数据里用的是kafka,来回切换麻烦,链各个消息中间件的架构上不同
像RabbitMQ有exchange,kafka有Topic和Partitions分区
这些中间件的差异导致我们实际项目开发给我们造成了一定的困难,我们如果用了两个消息队列的其中一种,后面的业务需求,我们想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦合的方式。
Stream的消息通信方式遵循了发布-订阅模式
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。INPUT对应于生产者,OUTPUT对应于消费者
Stream标准流程套路:
- binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
- Source(生产)和sink(消费):简单地可理解为参照对象是spring cloud stream自身,从stream发布消息就是输出,接收消息就是输入
常用注解:
组成 |
说明 |
Middleware |
中间件,目前只支FRabbitMQ和Kafka |
Binder |
Binder是应用与消息中间件之间的封装,目前实行了KafKa和RabbitMQ的Binder,通过 Binder可以很方便的连接中间件,可以动态的改变消息类型(对应kafka的topic, RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input |
注解标识输入通道,通过该输入通接收到的消息息进入应用程序 |
@Output |
注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener |
监听队列,用于消费者的队列的消息接收 |
@EnableBinding |
指信道channel和exchange绑定在一起 |
消息生产者
要新建3个子模块
- cloud-stream-rabbitmq-provide8801:作为生产者进行发消息模块
- cloud-stream-rabbitmq-consumer8802:作为消息接收模块
- cloud-stream-rabbitmq-consumer8802:作为消息接收模块
新建模块 stream-rabbitmq-provider8801
8801 pom依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud2022</artifactId> <groupId>com.wzg.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>stream-rabbitmq-provide8801</artifactId>
<dependencies> <!-- stream-rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
<!--eureka-client 目前,这个不是必须的--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity --> <groupId>com.wzg.springcloud</groupId> <artifactId>api-commons</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties>
</project>
|
yml 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| server: port: 8801
spring: application: name: stream-provider cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: output: destination: studyExchange content-type: application/json
eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka/ instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: send-8801.com prefer-ip-address: true
|
主启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.wzg.springcloud;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class Provider8801 { public static void main(String[] args) { SpringApplication.run(Provider8801.class,args); } }
|
业务类:(此业务类不是以前的service,而实负责推送消息的服务类)
- 发送消息的接口类
- 发送消息接口类的实现类
- controller
1 2 3 4 5 6 7 8
| package com.wzg.springcloud.serivce;
public interface IMessageProvider { public String send(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.wzg.springcloud.serivce;
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource; import java.util.UUID;
@EnableBinding(Source.class) public class IMessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output;
@Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("******serial: " + serial); return null; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.wzg.springcloud.controller;
import com.wzg.springcloud.serivce.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController public class SendMessageController {
@Resource private IMessageProvider messageProvider;
@GetMapping("/sendMessage") public String sendMessage(){ return messageProvider.send(); } }
|
启动Eureka Server 7001,再启动8801,进行测试,看是否rabbitMQ中有我们发送的消息。
消息消费者
新建模块 stream-rabbitmq-consumer8802
pom依赖和生产者一样。
yml配置: 在 stream的配置上,和生产者只有一处不同的地方,output 改成 input
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| server: port: 8802 spring: application: name: cloud-stream-provider cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka/ instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: receive-8802.com prefer-ip-address: true
|
接收消息的业务类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message;
@Component @EnableBinding(Sink.class) public class ConsumerController {
@Value("${server.port}") private String serverPort;
@StreamListener(Sink.INPUT) public void input(Message<String> message){ System.out.println("消费者1号,serverport: " + serverPort + ",接受到的消息:" + message.getPayload()); } }
|
配置分组消费
新建 stream-rabbitmq-consumer8803 模块:
8803 就是 8802 clone出来的。
当运行时,会有两个问题。
第一个问题,两个消费者都接收到了消息,这属于重复消费。例如,消费者进行订单创建,这样就创建了两份订单,会造成系统错误。
注意在stream中处同一个group中的多个消费者是竞争关系,就能保证消息只会被其中一个应用消费一次。
不同组是可以全面消费(重复消费)的
同一组内会发生竞争关系,只有其中一个可以消费。
Stream默认不同的微服务是不同的组
对于重复消费这种问题,导致的原因是默认每个微服务是不同的group,组流水号不一样,所以被认为是不同组,两个都可以消费。
解决的办法就是自定义配置分组:
消费者 yml 文件配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit group: dkfA bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit group: dkfB
|
当两个消费者配置的 group 都为 dkfA 时,就属于同一组,就不会被重复消费。(两个消费者消费同一队列)
消息持久化
加上group配置,就已经实现了消息的持久化。