SpringCloud学习笔记(9)

Seata处理分布式事务

分布式事务问题

只要用到分布式,必然会提及分布式的事务。

在分布式之前,一切组件全都在一台机器上。

在使用分布式之后,单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源。

业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性问题没法保证。

用户购买商品的业务逻辑整个业务逻辑由3个微服务提供支持:

仓储服务:对给定的商品扣除仓储数量

订单服务:根据采购需求创建订单.

账户服务:从用户账户中扣除余额.

一句话:一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题。

一、Seata简介与安装

Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 官网

1.1 相关术语

一个典型的分布式事务过程,可以用分布式处理过程的一ID+三组件模型来描述。

一ID(全局唯一的事务ID):Transaction ID XID,在这个事务ID下的所有事务会被统一控制

三组件:

●Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;(Server端,为单独服务器部署)

●Transaction Manager (TM):事务管理器,控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;

●Resource Manager (RM):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

●Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成(微服务)。

1.2 典型的分布式控制事务流程

1TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID;

2XID 在微服务调用链路的上下文中传播;(也就是在多个TM,RM中传播)

3RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖;

4TM 向 TC 发起针对 XID 的全局提交或回滚决议;

5TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

我们只需要使用一个@GlobalTransational注解在业务方法上

初始化操作

  1. 修改 conf/file.conf 文件:

主要修改自定义事务组名称 + 事务日志存储模式为db + 数据库连接信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
service {
#transaction service group mapping
vgroup_mapping.dkf_tx_group = "fsp_tx_group" # 修改这里
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#disable seata
disableGlobalTransaction = false
}

## transaction log store, only used in seata-server
store {
## store mode: file、db
mode = "db" # 修改这里

## file store property
file {
## store location dir
dir = "sessionStore"
}

## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root" # 修改对
password = "123"
}
}
  1. 创建名和 file.conf 指定一致的数据库。

  2. 在新建的数据库里面创建数据表,db_store.sql文件在 conf 目录下(1.0.0有坑,没有sql文件,下载0.9.0的,使用它的sql文件即可)

  3. 修改 conf/registry.conf 文件内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    registry {
    # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa # 默认file
    type = "nacos"

    nacos { # 修改nacos的端口8848
    serverAddr = "localhost:8848"
    namespace = ""
    cluster = "default"
    }
  4. 先启动 nacos Server 服务,再启动seata Server 。

  5. 启动 Seata Server 报错,在bin目录创建 /logs/seata_gc.log 文件。再次双击 bat文件启动。

数据库准备

这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务。

当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,
再通过远程调用账户服务来扣减用户账户里面的余额,
最后在订单服务中修改订单状态为已完成。

该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。

创建三个数据库: seata_account、seata_order、seata_storage

三个数据库都创建一个回滚日志表,seata/conf/ 有相应的sql文件(1.0.0没有,依然使用0.9.0中的)。

最终效果:

  • seata
    • branch_table
    • global_table
    • lock_table
  • seata_account
    • t_account
    • undo_log
  • seata_order
    • t_order
    • undo_log
  • seata_storage
    • t_storage
    • undo_log

开发

实现 下订单-> 减库存 -> 扣余额 -> 改(订单)状态

需要注意的是,下面做了 seata 与 mybatis 的整合,所以注意一下,和以往的mybatis的使用不太一样。

新建模块 alibaba-seata-order2001 :

pom依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2022</artifactId>
<groupId>com.wzg.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>seata-order-service2001</artifactId>

<dependencies>
<!-- seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
<!-- springcloud alibaba nacos 依赖,Nacos Server 服务注册中心 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

<!-- open feign 服务调用 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<!-- springboot整合Web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- 持久层支持 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<!--mysql-connector-java-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>

<!-- 日常通用jar包 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
<groupId>com.wzg.springcloud</groupId>
<artifactId>api-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>

yml配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
server:
port: 2001

spring:
application:
name: seata-order-service
cloud:
alibaba:
seata:
# 自定义事务组,需要和当时在 seata/conf/file.conf 中的一致
tx-service-group: fsp_tx_group
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&useInformationSchema=false
username: root
password: 123


# 注意,这是自定义的,原来的是mapper_locations
mybatis:
mapperLocations: classpath:mapper/*.xml

feign:
hystrix:
enabled: false

logging:
level:
io:
seata: info

file.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}

service {

vgroup_mapping.fsp_tx_group = "default"

default.grouplist = "127.0.0.1:8091"
enableDegrade = false
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
disableGlobalTransaction = false
}


client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
}

## transaction log store
store {
## store mode: file、db
mode = "db"

## file store
file {
dir = "sessionStore"

# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}

## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root"
password = "123"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
lock {
## the lock store mode: local、remote
mode = "remote"

local {
## store locks in user's database
}

remote {
## store locks in the seata's server
}
}
recovery {
#schedule committing retry period in milliseconds
committing-retry-period = 1000
#schedule asyn committing retry period in milliseconds
asyn-committing-retry-period = 1000
#schedule rollbacking retry period in milliseconds
rollbacking-retry-period = 1000
#schedule timeout retry period in milliseconds
timeout-retry-period = 1000
}

transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
#schedule delete expired undo_log in milliseconds
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}

## metrics settings
metrics {
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}

support {
## spring
spring {
# auto proxy the DataSource bean
datasource.autoproxy = false
}
}

将 seata/conf/ 下的registry.cong 文件拷贝到 resource 目录下。

创建 domain 实体类 : Order 和 CommonResult 两个实体类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.wzg.springcloud.daomain;

/**
* @author whlie(true){learn}
*/

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;


@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long id;

private Long userId;

private Long productId;

private Integer count;

private BigDecimal money;

private Integer status; //订单状态:0:创建中;1:已完结
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.wzg.springcloud.daomain;

/**
* @author whlie(true){learn}
*/

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;

public CommonResult(Integer code, String message) {
this(code, message, null);
}
}

dao :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.wzg.springcloud.dao;

/**
* @author whlie(true){learn}
*/

import com.wzg.springcloud.daomain.Order;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

@Mapper
public interface OrderDao {
//新建订单
void create(Order order);

//修改订单状态,从零改为1
void update(@Param("userId") Long userId, @Param("status") Integer status);
}

在resource目录下创建mapper文件夹并创建OrderMapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >

<mapper namespace="com.wzg.springcloud.dao.OrderDao">

<!-- 以备后面会用到 -->
<resultMap id="BaseResultMap" type="com.wzg.springcloud.daomain.Order">
<id column="id" property="id" jdbcType="BIGINT"></id>
<result column="user_id" property="userId" jdbcType="BIGINT"></result>
<result column="product_id" property="productId" jdbcType="BIGINT"></result>
<result column="count" property="count" jdbcType="INTEGER"></result>
<result column="money" property="money" jdbcType="DECIMAL"></result>
<result column="status" property="status" jdbcType="INTEGER"></result>
</resultMap>

<insert id="create" >
insert into t_order values (null, #{userId}, #{productId}, #{count}, #{money}, 0);
</insert>


<update id="update">
update t_order
set status = 1
where user_id = #{userId}
and status = #{status};
</update>

</mapper>

service :

三个接口

1
2
3
4
5
6
7
8
9
10
package com.wzg.springcloud.service;

import com.wzg.springcloud.daomain.Order;

/**
* @author whlie(true){learn}
*/
public interface OrderService{
void create(Order order);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.wzg.springcloud.service;

import com.wzg.springcloud.daomain.CommonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
* @author whlie(true){learn}
*/
@FeignClient(value = "seata-storage-service")
public interface StorageService{
@PostMapping(value = "/storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.wzg.springcloud.service;

import com.wzg.springcloud.daomain.CommonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

import java.math.BigDecimal;

/**
* @author whlie(true){learn}
*/
@FeignClient(value = "seata-account-service")
public interface AccountService{
@PostMapping(value = "/account/decrease")
CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}

一个实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.wzg.springcloud.service.iml;

import com.wzg.springcloud.dao.OrderDao;
import com.wzg.springcloud.daomain.Order;
import com.wzg.springcloud.service.AccountService;
import com.wzg.springcloud.service.OrderService;
import com.wzg.springcloud.service.StorageService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
* @author whlie(true){learn}
*/
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;

/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
*/

@Override
@GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
public void create(Order order) {
log.info("----->开始新建订单");
//新建订单
orderDao.create(order);

//扣减库存
log.info("----->订单微服务开始调用库存,做扣减Count");
storageService.decrease(order.getProductId(), order.getCount());
log.info("----->订单微服务开始调用库存,做扣减end");

//扣减账户
log.info("----->订单微服务开始调用账户,做扣减Money");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("----->订单微服务开始调用账户,做扣减end");


//修改订单状态,从零到1代表已经完成
log.info("----->修改订单状态开始");
orderDao.update(order.getUserId(), 0);
log.info("----->修改订单状态结束");

log.info("----->下订单结束了");

}
}

controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.wzg.springcloud.service.iml;

import com.wzg.springcloud.dao.OrderDao;
import com.wzg.springcloud.daomain.Order;
import com.wzg.springcloud.service.AccountService;
import com.wzg.springcloud.service.OrderService;
import com.wzg.springcloud.service.StorageService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
* @author whlie(true){learn}
*/
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;

/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
*/

@Override
@GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
public void create(Order order) {
log.info("----->开始新建订单");
//新建订单
orderDao.create(order);

//扣减库存
log.info("----->订单微服务开始调用库存,做扣减Count");
storageService.decrease(order.getProductId(), order.getCount());
log.info("----->订单微服务开始调用库存,做扣减end");

//扣减账户
log.info("----->订单微服务开始调用账户,做扣减Money");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("----->订单微服务开始调用账户,做扣减end");


//修改订单状态,从零到1代表已经完成
log.info("----->修改订单状态开始");
orderDao.update(order.getUserId(), 0);
log.info("----->修改订单状态结束");

log.info("----->下订单结束了");

}
}

config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.wzg.springcloud.config;

/**
* @author whlie(true){learn}
*/
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;


@Configuration
public class DataSourceProxyConfig {


@Value("${mybatis.mapperLocations}")
private String mapperLocations;


@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}


@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}


@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.wzg.springcloud.config;

/**
* @author whlie(true){learn}
*/
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Configuration;


@Configuration
@MapperScan({"com.wzg.springcloud.dao"})
public class MyBatisConfig {
}

主启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.wzg.springcloud;

/**
* @author whlie(true){learn}
*/
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源自动创建的配置
public class SeataOrderMainApp2001{

public static void main(String[] args){
SpringApplication.run(SeataOrderMainApp2001.class, args);
}
}

启动测试

先启动nacos-1.1.4和seata-0.9.0,再启动2001。

仿照上面 创建 alibaba-seata-storage2002 和alibaba-seata-account2003 两个模块,唯一大的区别就是这两个不需要导入 open-feign 远程调用其它模块。

Seata全局事务怎么使用

Spring提供的本地事务:@Transactional

Seata提供的全局事务:@GlobalTransactional

SEATA的分布式交易解决方案

我们只需要使用一个

注解在业务方法上:

@GlobalTransactional

数据库初始情况

下订单->减库存->扣余额->改(订单)状态

测试正常下单

启动nacos、seata、2001、2002、2003;

测试:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100

报错

java.sql.SQLException:Failed to fetch schema of t_order

Connector/J 5.0.0以后的版本有一个名为useInformationSchema的数据库连接参数,Connector/J 在mysql8.0中默认配置连接属性useInformationSchema为true,使查询table信息时更为有效。用户依然可以配置useInformationSchema为false,但是在8.0.3及其之后的版本中,由于不能支持早期的特性,某些数据字典的查询可能会失败。

在各微服务的application.yml 文件的spring.datasource.url 后面加上&useInformationSchema=false设置useInformationSchema为false,即可解决该问题。 参考:https://www.jianshu.com/p/acc99f891e91

再次测试

访问成功

测试超时异常:不加@GlobalTransactional

AccountServiceImpl添加超时:

我们使用的是Openfeign,默认超时时长是1s,这里我们延迟30s。

当库存和账户金额扣减后,订单状态并没有设置为已经完成,没有从零改为1;而且由于feign的重试机制,账户余额还有可能被多次扣减。

测试超时异常:加@GlobalTransactional

OrderServiceImpl添加@GlobalTransactional注解,注意改注解只能用在方法上!

●name:给定全局事务实例的名称,随便取,唯一即可

●rollbackFor:当发生什么样的异常时,进行回滚

●noRollbackFor:发生什么样的异常不进行回滚。

小结

做好配置后,我们只需要使用一个 @GlobalTransactional(name = “lsp-create-order”, rollbackFor = Exception.class) 放在业务的入口,即可实现控制全局的事务。注意该注解只能放在方法上。

再看TC/TM/RM三大组件

TC:seata服务器; (我们电脑上启动的seata )
TM:事物的发起者,业务的入口。 哪个微服务使用了@GlobalTransactional哪个就是TM
RM:事务的参与者,一个数据库就是一个RM。

分布式事务的执行流程:
1TM 开启分布式事务(TM 向 TC 注册全局事务记录);
2按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
3TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
4TC 汇总事务信息,决定分布式事务是提交还是回滚;
5TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。

AT模式(默认)如何做到对业务的无侵入

Seata有四大模式:AT(默认)、TCC、SAGA、XA。(阿里云上的AT叫做GTS,收费)
AT模式
AT模式两阶段提交协议的演变:
●一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
●二阶段:
○提交异步化,非常快速地完成。
○回滚通过一阶段的回滚日志进行反向补偿(前面insert,后面回滚时就delete)。

每个数据库除了自身存储数据的表以外,都会有一个事务回滚表:undo_log
Seata库中存在:branch_table\global_table\lock_table\distributed_lock(高版本才有)这样一些表

一阶段加载

在一阶段,Seata 会拦截“业务 SQL”,
1 解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”(前置镜像)
2 执行“业务 SQL”更新业务数据,在业务数据更新之后,
3 其保存成“after image”,最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

二阶段回滚

二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。
回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”。如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

整体流程图


SpringCloud学习笔记(9)
https://yztldxdz.top/2022/09/25/SpringCloud学习笔记(9)/
发布于
2022年9月25日
许可协议