集成分布式事务解决方案 - Seata

1. 概述

Seata是一款开源的分布式事务解决方案框架,用于提供各种模式(包括无业务代码侵入性)的事务模式,其全称是Simple Extensible Autonomous Transaction ArchitectureSeata框架于2019年由阿里巴巴集团正式开源,同时于2023年10月将该项目捐赠于Apache基金会,该项目也广泛应用于阿里巴巴集团内部交易 / 订单的分布式事务一致性

Seata主要解决的是在“微服务时代”散落各地不同数据源的事务问题,直接理解起来有些许抽象,参考Seata官网提供的例子,架构图如下所示:

transaction-demo

上图展示了3个核心微服务:

  • Stock:负责仓储信息,用于对指定库存信息进行扣减
  • Order:负责订单信息,用于根据需求进行采购并创建订单
  • Account:负责账户信息,用于从用户账户中扣除余额

其中Business作为业务触发点进行扣减库存 - 创建订单 - 扣减账户余额等一系列操作,在常见的微服务架构中,各个服务之间的操作均是不可逆的,意思是当调用Stock服务进行库存扣减后,该行为不支持回滚,因此在后续的扣减账户余额这一步骤中,如果出现了任何的异常(比方说账户余额不足以进行扣减),库存的扣减补偿动作无疑只能重新通过Stock服务提供的增加库存API进行操作

而理想的情况下(可以将这三个服务想象成均存在于一个单体服务中),这几个动作应当在同一个事务中,并且在任意一步出现了异常,都可以进行数据回滚,而不需要基于API能力的形式进行补偿,而跨服务的全局事务,则是Seata框架所提供的解决方案

Seata框架提供了一个@GlobalTransactional注解用于标识开启跨服务的全局事务,使用方式如下所示:

1
2
3
4
@GlobalTransactional
public void createOrder(long accountId, long storageId, int quantity) {
......
}

其中,Seata解决方案中涉及到的专有名词为以下:

  • TC:全称为Tranaction Coordinator,中文为事务协调者,主要负责维护全局事务和分支事务的状态,决策最终是做事务提交还是事务回滚,也就是下述即将提到的Seata Server
  • TM:全称为Transaction Manager,中文为事务管理者,主要负责开启全局事务,提交或回滚全局事务,用于定义全局事务的范围
  • RM:全称为Resource Manager,中文为资源管理器,主要负责与TC通讯分支事务状态以及注册分支事务,并且用于驱动事务回滚以及事务提交

2. 架构

下述将基于Seata官网的订单 - 账户 - 库存样例进行搭建实践全局事务管理,其中涉及到的服务套件包含以下:

  • Nacos:使用其作为3个核心微服务与seata-server的服务注册中心与配置中心
  • Seata-ServerSeata提到的事务协调者角色,就由该服务充当,所有的业务微服务都将与seata-sever进行通讯以便形成全局事务控制
  • 业务服务:用于塑造下单 - 扣减库存 - 扣减余额的典型业务场景,并验证Seata分布式事务解决方案的有效性

整体架构如下所示:

seata-demo

3. Seata Server部署

Tips: 可参考官网提供的Seata Server部署方式,以下所述的部署方式包括业务代码均已上传至GitHub中,可通过https://github.com/zchengb/seata-demo进行代码拉取

到了部署seata-server这一步,其实Seata官网提供了不同的部署方式,但基于笔者本地环境,选择了基于Nacos官网提供的docker-compose部署方式(https://github.com/nacos-group/nacos-docker)与 Seata官网提供的Docker镜像进行结合部署,整体结合后的docker-compose.yml如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
version: "3.8"
services:
nacos:
image: nacos/nacos-server:${NACOS_VERSION}
platform: linux/amd64
container_name: nacos
env_file:
- ./env/nacos-standlone-mysql.env
volumes:
- ./standalone-logs/:/home/nacos/logs
ports:
- "8848:8848"
- "9848:9848"
depends_on:
mysql:
condition: service_healthy
restart: always
mysql:
container_name: mysql
platform: linux/amd64
build:
context: .
dockerfile: ./image/mysql/8/Dockerfile
image: example/mysql:8.0.30
env_file:
- ./env/mysql.env
volumes:
- ./mysql:/var/lib/mysql
healthcheck:
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
interval: 5s
timeout: 10s
retries: 10
seata:
image: seataio/seata-server:1.6.1
platform: linux/amd64
container_name: seata
volumes:
- ./seata-server:/seata-server/resources
ports:
- "8091:8091"
- "7091:7091"
environment:
- SEATA_IP=当前本机的IP地址
depends_on:
- nacos

Tips: MySQL容器的初始化账号密码均为root

其中涉及到的seata-server启动配置项文件application.yaml则是通过docker cp seata:/seata-server/resources ./seata-server的方式拷贝后进行持久化挂载(参考seata.volumes参数)

其中需要注意的是seata-server容器中环境的配置项SEATA_IP,该IP值不可以设置为127.0.0.1localhost,该配置项主要用于服务注册时,默认在Docker容器环境中,服务注册上送至Nacos的IP地址将会是容器IP地址,因此需要进行变更,以便后续业务服务能够从Nacos中进行服务发现,获取seata-server的IP地址,如下所示:

nacos中seata-server的服务注册信息

seata-server涉及的完整application.yaml内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
server:
port: 7091

spring:
application:
name: seata-server

logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash

console:
user:
username: seata
password: seata

seata:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: nacos:8848
namespace: eed4a53a-2ee3-482e-ae60-de29641557fe
data-id: seataServer.properties
group: DEFAULT_GROUP
username: nacos
password: nacos
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
server-addr: nacos:8848
namespace: eed4a53a-2ee3-482e-ae60-de29641557fe
group: DEFAULT_GROUP

security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login

需要注意的是seata-server所用到的命名空间(seata.config.nacos.namespaceseata.registry.nacos.namespace)不建议使用默认的public命名空间,会导致循环拉取配置,日志无限输出的情况,另外在配置命名空间的时候,应当配置的是命名空间的ID,具体可参考Seata集成nacos疯狂打印日志bug解决

有关seata-server所需要的配置信息,存放至nacos对应命名空间下,其中data-idseataServer.properties,整体配置信息如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#Transaction routing rules configuration, only for the client
service.vgroupMapping.default_tx_group=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false

#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h

#Log rule configuration, for client and server
log.exceptionRate=100

#Transaction storage configuration, only for the server. The file, db, and redis configuration values are optional.
store.mode=db
store.lock.mode=db
store.session.mode=db


store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://mysql:3306/seata-server?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false

上述配置跟seata官方提供的默认配置中主要的差异点在于,将相关的事务相关的存储介质从文件变更为数据库(MySQL),主要参考上述的store.mode配置项,对应需要将数据库连接信息进行同步更新store.db.urlstore.db.userstore.db.password

既然上述提到了采用MySQL数据库的方式来存储相关的事务信息,那么seata-server所涉及的数据表是需要初始化载入到数据库中的,以下是相关的初始化表SQL语句(摘选于Seata官方):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

至此,seata-server部署所需要的配置均已完成,可通过http://localhost:7091/#/login访问seata-server的后台进行观察,初始化账号密码均为seata,页面效果如下所示:

seata-admin-demo

4. 业务服务

Tips: 涉及到的业务服务代码可从https://github.com/zchengb/seata-demo进行拉取

业务服务主要用于塑造下单 - 扣减库存 - 扣减余额的典型业务场景,并验证Seata分布式事务解决方案的有效性

业务服务主要涉及:订单服务、账户服务和库存服务,其中服务注册发现以及配置获取主要结合了nacos服务,订单服务相关的application.yaml配置如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
seata:
enabled: true
registry:
type: nacos
nacos:
namespace: eed4a53a-2ee3-482e-ae60-de29641557fe
group: DEFAULT_GROUP
server-addr: localhost:8848
config:
type: nacos
nacos:
namespace: eed4a53a-2ee3-482e-ae60-de29641557fe
group: DEFAULT_GROUP
server-addr: localhost:8848
tx-service-group: default_tx_group
server:
port: 8081
spring:
jpa:
show-sql: true
application:
name: order
cloud:
nacos:
config:
namespace: eed4a53a-2ee3-482e-ae60-de29641557fe
server-addr: localhost:8848
discovery:
namespace: eed4a53a-2ee3-482e-ae60-de29641557fe
server-addr: localhost:8848
config:
import: nacos:order.yaml?refresh=true

此处需要注意的是涉及到的nacos.xxx.namespace需要与seata-server服务的命名空间保持一致(因为业务服务作为RM角色需要向TC角色也就是seata-server进行注册,因此这一过程涉及到服务间的交互,也就意味着需要使用到nacos的服务发现)

上述application.yaml中缺失的datasource相关配置放置于nacos配置中心中

需要引入的相关gradle依赖如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
plugins {
id 'org.springframework.boot' version '2.6.13'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
id 'java'
}

group = 'cn.zchengb'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
}

ext {
set('springCloudAlibabaVersion', "2021.0.5.0")
set('springCloudVersion', "2021.0.5")
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config'
implementation 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery'
implementation 'org.springframework.cloud:spring-cloud-starter-loadbalancer'
implementation 'com.alibaba.cloud:spring-cloud-starter-alibaba-seata'
implementation 'org.flywaydb:flyway-core'
implementation 'mysql:mysql-connector-java'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
imports {
mavenBom "com.alibaba.cloud:spring-cloud-alibaba-dependencies:${springCloudAlibabaVersion}"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

tasks.named('test') {
useJUnitPlatform()
}

在集成seata前,需要为每个业务服务对应的数据库创建seata所需的undo-log数据表用于存储过程中涉及到的事务信息,undo-log数据表的初始化SQL如下所示(参考Seata官方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

关联的业务核心代码如下所示:

  • 订单服务 - 创建订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
@RequiredArgsConstructor
public class OrderApplicationService {
private final StorageClient storageClient;
private final AccountClient accountClient;
private final OrderRepository orderRepository;

@GlobalTransactional
public void createOrder(long accountId, long storageId, int quantity) {
if (quantity <= 0) {
throw new IllegalArgumentException("quantity must be positive.");
}

var price = storageClient.fetchLatestPrice(storageId);
price = price * quantity;
accountClient.deductBalance(accountId, price);
storageClient.deductStorage(storageId, quantity);

var order = Order.create(accountId, storageId, price, quantity);
orderRepository.save(order);
}
}
  • 账户服务 - 扣减余额
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
@RequiredArgsConstructor
public class AccountApplicationService {
private final AccountRepository accountRepository;

public void balanceDeduct(Long accountId, long deductValue) {
if (deductValue <= 0) {
throw new IllegalArgumentException("deduct value must be positive.");
}

var account = accountRepository.findById(accountId).orElseThrow(
() -> new IllegalArgumentException("cannot found account")
);

account.deduct(deductValue);
accountRepository.save(account);
}
}
  • 库存服务 - 扣减库存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
@RequiredArgsConstructor
public class StorageApplicationService {
private final StorageRepository storageRepository;

public void decreaseStorage(long storageId, int quantity) {
var storage = findStorage(storageId);
storage.decreaseStorage(quantity);
storageRepository.save(storage);
}

public long getLatestPrice(long storageId) {
return findStorage(storageId).getPrice();
}

private Storage findStorage(long storageId) {
return storageRepository.findById(storageId).orElseThrow(() -> new IllegalArgumentException("storage not found"));
}
}

其中seata所体现的分布式事务主要作用于订单服务中的@GlobalTransactional注解,当出现扣减库存 / 余额失败 / 创建订单失败时,将会对各个业务服务涉及到的数据库操作进行SQL回滚,如下所示:

global-transactional-demo

EOF. 相关链接