SpringCloud学习笔记(6)

消息驱动

Stream

需求:消息中间件很多,希望向上抽象一个接口,我们不关心底层用的是什么消息中间件

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

就像 JDBC 形成一种规范,统一不同数据库的接口

1597725567239

什么是SpringCloud Stream

官方定义SpringCloud Stream是一个构建消息驱动微服务的框架。https://spring.io/projects/spring-cloud-stream#overview

SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念

目前仅支持RabbitMQ、Kafka。

流程:

pub生产者发送消息,BROKER接收消息放到队列中,订阅者接收到消息

选修必须走特定的通道:下嘻嘻通道MessageChannel

消息通道里的消息如何消费呢?谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

比如java里用的是RabbitMQ,大数据里用的是kafka,来回切换麻烦,链各个消息中间件的架构上不同

像RabbitMQ有exchange,kafka有Topic和Partitions分区

这些中间件的差异导致我们实际项目开发给我们造成了一定的困难,我们如果用了两个消息队列的其中一种,后面的业务需求,我们想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦合的方式。

Stream的消息通信方式遵循了发布-订阅模式

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。INPUT对应于生产者,OUTPUT对应于消费者

img

Stream标准流程套路:

  • binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
  • Source(生产)和sink(消费):简单地可理解为参照对象是spring cloud stream自身,从stream发布消息就是输出,接收消息就是输入

1597730581088

常用注解:

组成 说明
Middleware 中间件,目前只支FRabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了KafKa和RabbitMQ的Binder,通过 Binder可以很方便的连接中间件,可以动态的改变消息类型(对应kafka的topic, RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通接收到的消息息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

消息生产者

要新建3个子模块

  • cloud-stream-rabbitmq-provide8801:作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802:作为消息接收模块
  • cloud-stream-rabbitmq-consumer8802:作为消息接收模块

新建模块 stream-rabbitmq-provider8801

8801 pom依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2022</artifactId>
<groupId>com.wzg.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>stream-rabbitmq-provide8801</artifactId>

<dependencies>
<!-- stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<!--eureka-client 目前,这个不是必须的-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
<groupId>com.wzg.springcloud</groupId>
<artifactId>api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>

yml 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
server:
port: 8801

spring:
application:
name: stream-provider
cloud:
stream:
binders: # 在此配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于和binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 表示是生产者,向rabbitMQ发送消息
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次是json,文本是 "text/plain"
# binder: defaultRabbit # 设置要绑定的消息服务的具体配置

eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间,默认是30秒
lease-expiration-duration-in-seconds: 5 # 最大心跳间隔不能超过5秒,默认90秒
instance-id: send-8801.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径变为ip地址

主启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.wzg.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

/**
* @author whlie(true){learn}
*/
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Provider8801 {
public static void main(String[] args) {
SpringApplication.run(Provider8801.class,args);
}
}

业务类:(此业务类不是以前的service,而实负责推送消息的服务类)

  • 发送消息的接口类
  • 发送消息接口类的实现类
  • controller
1
2
3
4
5
6
7
8
package com.wzg.springcloud.serivce;

/**
* @author whlie(true){learn}
*/
public interface IMessageProvider {
public String send();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.wzg.springcloud.serivce;

/**
* @author whlie(true){learn}
*/

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class) // 定义消息的推送管道 output//不是和controller打交道的service,而是发送消息的推送服务类
public class IMessageProviderImpl implements IMessageProvider {
//上面是自定义的接口
@Resource
private MessageChannel output;//消息发送管道

@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());// 绑定器
System.out.println("******serial: " + serial);
return null;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.wzg.springcloud.controller;

import com.wzg.springcloud.serivce.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
* @author whlie(true){learn}
*/
@RestController
public class SendMessageController {

@Resource // 自己的类
private IMessageProvider messageProvider;

@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send(); // 自己定义的方法,但是里面调用了MessageChannel.send()方法
}
}

启动Eureka Server 7001,再启动8801,进行测试,看是否rabbitMQ中有我们发送的消息。

消息消费者

新建模块 stream-rabbitmq-consumer8802

pom依赖和生产者一样。

yml配置: 在 stream的配置上,和生产者只有一处不同的地方,output 改成 input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
server:
port: 8802
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在次配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于和binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 表示是消费者,这里是唯一和生产者不同的地方,向rabbitMQ发送消息
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次是json,文本是 "text/plain"
binder: defaultRabbit # 设置要绑定的消息服务的具体配置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间,默认是30秒
lease-expiration-duration-in-seconds: 5 # 最大心跳间隔不能超过5秒,默认90秒
instance-id: receive-8802.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径变为ip地址

接收消息的业务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@Component
@EnableBinding(Sink.class)
public class ConsumerController {

@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT) // 消费者
public void input(Message<String> message){
System.out.println("消费者1号,serverport: " + serverPort + ",接受到的消息:" + message.getPayload());
}
}

配置分组消费

新建 stream-rabbitmq-consumer8803 模块:

8803 就是 8802 clone出来的。

当运行时,会有两个问题。

第一个问题,两个消费者都接收到了消息,这属于重复消费。例如,消费者进行订单创建,这样就创建了两份订单,会造成系统错误。

注意在stream中处同一个group中的多个消费者是竞争关系,就能保证消息只会被其中一个应用消费一次。

不同组是可以全面消费(重复消费)的

同一组内会发生竞争关系,只有其中一个可以消费。

Stream默认不同的微服务是不同的组

1597731630685

对于重复消费这种问题,导致的原因是默认每个微服务是不同的group,组流水号不一样,所以被认为是不同组,两个都可以消费。

解决的办法就是自定义配置分组:

消费者 yml 文件配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 8802 的消费者
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: dkfA # 自定义分组配置

# 8803 的消费者
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: dkfB # 自定义分组配置

当两个消费者配置的 group 都为 dkfA 时,就属于同一组,就不会被重复消费。(两个消费者消费同一队列)

消息持久化

加上group配置,就已经实现了消息的持久化。


SpringCloud学习笔记(6)
https://yztldxdz.top/2022/09/21/SpringCloud学习笔记(6)/
发布于
2022年9月21日
许可协议