分布式事务 | 使用DTM 的Saga 模式(二)


风晓
风晓 2023-12-31 09:27:34 49796 赞同 0 反对 0
分类: 资源
接上一篇分布式事务 | 使用DTM 的Saga 模式(一)

转出子事务(TransferOut)

 
 
[HttpPost("TransferOut")]
 
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
 
{
 
var msg = $"用户{request.UserId}转出{request.Amount}元";
 
_logger.LogInformation($"转出子事务-启动:{msg}");
 
// 1. 创建子事务屏障
 
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
 
try
 
{
 
using (var conn = _context.Database.GetDbConnection())
 
{
 
// 2. 在子事务屏障内执行事务操作
 
await branchBarrier.Call(conn, async (tx) =>
 
{
 
_logger.LogInformation($"转出子事务-执行:{msg}");
 
await _context.Database.UseTransactionAsync(tx);
 
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
 
if (bankAccount == null || bankAccount.Balance < request.Amount)
 
throw new InvalidDataException("账户不存在或余额不足!");
 
bankAccount.Balance -= request.Amount;
 
await _context.SaveChangesAsync();
 
});
 
}
 
}
 
catch (InvalidDataException ex)
 
{
 
_logger.LogInformation($"转出子事务-失败:{ex.Message}");
 
// 3. 按照接口协议,返回409,以表示子事务失败
 
return new StatusCodeResult(StatusCodes.Status409Conflict);
 
}
 
_logger.LogInformation($"转出子事务-成功:{msg}");
 
return Ok();
 
}
 

以上代码中有几点需要额外注意:

  1. 使用Saga模式,必须开启子事务屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四个参数:
    1. gid:全局事务Id
    2. trans_type:事务类型,是saga、msg、xa或者是tcc。
    3. branch_id:子事务的Id
    4. op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)。
  2. 必须在子事务屏障内执行事务操作:branchBarrier.Call(conn, async (tx) =>{}
  3. 对于Saga正向操作而言,业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚。而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回**409**状态码以告知DTM 子事务失败。
  4. 以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回409状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception),如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。

转出补偿子事务(TransferOut_Compensate)

转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:

 
 
[HttpPost("TransferOut_Compensate")]
 
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
 
{
 
var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
 
_logger.LogInformation($"转出补偿子事务-启动:{msg}");
 
// 1. 创建子事务屏障
 
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
 
using (var conn = _context.Database.GetDbConnection())
 
{
 
// 在子事务屏障内执行事务操作
 
await branchBarrier.Call(conn, async (tx) =>
 
{
 
_logger.LogInformation($"转出补偿子事务-执行:{msg}");
 
await _context.Database.UseTransactionAsync(tx);
 
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
 
if (bankAccount == null)
 
return; //对于补偿操作,可直接返回,中断后续操作
 
bankAccount.Balance += request.Amount;
 
await _context.SaveChangesAsync();
 
});
 
}
 
_logger.LogInformation($"转出补偿子事务-成功!");
 
// 2. 因补偿操作必须成功,所以必须返回200。
 
return Ok();
 
}
 

由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。
另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**因此当出现bankAccount==null时可以直接 return。

转入子事务(TransferIn)

转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。

 
 
[HttpPost("TransferIn")]
 
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
 
{
 
var msg = $"用户{request.UserId}转入{request.Amount}元";
 
_logger.LogInformation($"转入子事务-启动:{msg}");
 
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
 
try
 
{
 
using (var conn = _context.Database.GetDbConnection())
 
{
 
await branchBarrier.Call(conn, async (tx) =>
 
{
 
_logger.LogInformation($"转入子事务-执行:{msg}");
 
await _context.Database.UseTransactionAsync(tx);
 
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
 
if (bankAccount == null)
 
throw new InvalidDataException("账户不存在!");
 
bankAccount.Balance += request.Amount;
 
await _context.SaveChangesAsync();
 
});
 
}
 
}
 
catch (InvalidDataException ex)
 
{
 
_logger.LogInformation($"转入子事务-失败:{ex.Message}");
 
return new StatusCodeResult(StatusCodes.Status409Conflict);
 
}
 
_logger.LogInformation($"转入子事务-成功:{msg}");
 
return Ok();
 
}
 

转入补偿子事务(TransferIn_Compensate)

转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功。

 
 
[HttpPost("TransferIn_Compensate")]
 
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
 
{
 
var msg = "用户{request.UserId}回滚转入{request.Amount}元";
 
_logger.LogInformation($"转入补偿子事务-启动:{msg}");
 
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
 
using (var conn = _context.Database.GetDbConnection())
 
{
 
await branchBarrier.Call(conn, async (tx) =>
 
{
 
_logger.LogInformation($"转入补偿子事务-执行:{msg}");
 
await _context.Database.UseTransactionAsync(tx);
 
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
 
if (bankAccount == null) return;
 
bankAccount.Balance -= request.Amount;
 
await _context.SaveChangesAsync();
 
});
 
}
 
_logger.LogInformation($"转入补偿子事务-成功!");
 
return Ok();
 
}
 

编排Saga事务

拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:

 
 
[HttpPost("Transfer")]
 
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
 
CancellationToken cancellationToken)
 
{
 
try
 
{
 
_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
 
//1. 生成全局事务ID
 
var gid = await _dtmClient.GenGid(cancellationToken);
 
var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
 
//2. 创建Saga
 
var saga = _transFactory.NewSaga(gid);
 
//3. 添加子事务
 
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
 
new TransferRequest(fromUserId, amount))
 
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
 
new TransferRequest(toUserId, amount))
 
.EnableWaitResult(); // 4. 按需启用是否等待事务执行结果
 
 
 
//5. 提交Saga事务
 
await saga.Submit(cancellationToken);
 
}
 
catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
 
{
 
_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
 
return new BadRequestObjectResult($"转账失败:{ex.Message}");
 
}
 
 
 
_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
 
return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
 
}
 

主要步骤如下:

  1. 生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);
  2. 创建Saga全局事务:_transFactory.NewSaga(gid);
  3. 添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。
  4. 如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。
  5. 提交Saga全局事务:saga.Submit(cancellationToken);
  6. 若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。

运行项目

既然DTM作为一个独立的服务存在,其负责通过HTTPgRPC协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:

 
 
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
 
WORKDIR /app
 
EXPOSE 80
 
EXPOSE 443
 
 
 
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
 
WORKDIR /src
 
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
 
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
 
COPY . .
 
WORKDIR "/src/DtmDemo.WebApi"
 
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build
 
 
 
FROM build AS publish
 
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish
 
 
 
FROM base AS final
 
WORKDIR /app
 
COPY --from=publish /app/publish .
 
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]
 

在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整个项目依赖mysqlDTM,修改docker-compose.yml如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。

 
 
version: '3.4'
 
services:
 
db:
 
image: 'mysql:5.7'
 
container_name: dtm-mysql
 
environment:
 
MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码
 
volumes:
 
- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 挂载用于初始化数据库的脚本
 
ports:
 
- '3306:3306'
 
dtm:
 
depends_on: ["db"]
 
image: 'yedf/dtm:latest'
 
container_name: dtm-svc
 
environment:
 
IS_DOCKER: '1'
 
STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事务数据
 
STORE_HOST: db # 指定MySQL服务名,这里是db
 
STORE_USER: root
 
STORE_PASSWORD: '123456'
 
STORE_PORT: 3306
 
STORE_DB: "dtm" # 指定DTM 数据库名
 
ports:
 
- '36789:36789' # DTM HTTP 端口
 
- '36790:36790' # DTM gRPC 端口
 
dtmdemo.webapi:
 
depends_on: ["dtm", "db"]
 
image: ${DOCKER_REGISTRY-}dtmdemowebapi
 
environment:
 
ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为docker
 
container_name: dtm-webapi-demo
 
build:
 
context: .
 
dockerfile: DtmDemo.WebApi/Dockerfile
 
ports:
 
- '31293:80' # 映射Demo:80端口到本地31293端口
 
- '31294:443' # 映射Demo:443端口到本地31294端口
 
 
 

其中dtmdemo.webapi服务通过ASPNETCORE_ENVIRONMENT: docker指定启动环境为docker,因此需要在项目下添加appsettings.docker.json以配置应用参数:

 
 
{
 
"ConnectionStrings": {
 
"DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
 
},
 
"TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo",
 
"dtm": {
 
"DtmUrl": "http://dtm:36789",
 
"DtmTimeout": 10000,
 
"BranchTimeout": 10000,
 
"DBType": "mysql",
 
"BarrierTableName": "dtm_barrier.barrier"
 
}
 
}
 
 
 

另外db服务中通过volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库dtm和示例项目使用子事务屏障需要的barrier数据表。脚本如下:

 
 
CREATE DATABASE IF NOT EXISTS dtm
 
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
 
;
 
drop table IF EXISTS dtm.trans_global;
 
CREATE TABLE if not EXISTS dtm.trans_global (
 
`id` bigint(22) NOT NULL AUTO_INCREMENT,
 
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
 
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
 
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
 
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
 
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
 
`create_time` datetime DEFAULT NULL,
 
`update_time` datetime DEFAULT NULL,
 
`finish_time` datetime DEFAULT NULL,
 
`rollback_time` datetime DEFAULT NULL,
 
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
 
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
 
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
 
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
 
`owner` varchar(128) not null default '' comment 'who is locking this trans',
 
`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
 
`result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
 
`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
 
PRIMARY KEY (`id`),
 
UNIQUE KEY `gid` (`gid`),
 
key `owner`(`owner`),
 
key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
 
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
 
drop table IF EXISTS dtm.trans_branch_op;
 
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
 
`id` bigint(22) NOT NULL AUTO_INCREMENT,
 
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
 
`url` varchar(1024) NOT NULL COMMENT 'the url of this op',
 
`data` TEXT COMMENT 'request body, depreceated',
 
`bin_data` BLOB COMMENT 'request body',
 
`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
 
`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
 
`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
 
`finish_time` datetime DEFAULT NULL,
 
`rollback_time` datetime DEFAULT NULL,
 
`create_time` datetime DEFAULT NULL,
 
`update_time` datetime DEFAULT NULL,
 
PRIMARY KEY (`id`),
 
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
 
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
 
drop table IF EXISTS dtm.kv;
 
CREATE TABLE IF NOT EXISTS dtm.kv (
 
`id` bigint(22) NOT NULL AUTO_INCREMENT,
 
`cat` varchar(45) NOT NULL COMMENT 'the category of this data',
 
`k` varchar(128) NOT NULL,
 
`v` TEXT,
 
`version` bigint(22) default 1 COMMENT 'version of the value',
 
create_time datetime default NULL,
 
update_time datetime DEFAULT NULL,
 
PRIMARY KEY (`id`),
 
UNIQUE key `uniq_k`(`cat`, `k`)
 
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
 
 
 
 
 
create database if not exists dtm_barrier
 
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
 
;
 
drop table if exists dtm_barrier.barrier;
 
create table if not exists dtm_barrier.barrier(
 
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
 
trans_type varchar(45) default '',
 
gid varchar(128) default '',
 
branch_id varchar(128) default '',
 
op varchar(45) default '',
 
barrier_id varchar(45) default '',
 
reason varchar(45) default '' comment 'the branch type who insert this record',
 
create_time datetime DEFAULT now(),
 
update_time datetime DEFAULT now(),
 
key(create_time),
 
key(update_time),
 
UNIQUE key(gid, branch_id, op, barrier_id)
 
);
 

如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!

评价 0 条
风晓L1
粉丝 1 资源 2038 + 关注 私信
最近热门资源
银河麒麟桌面操作系统备份用户数据  125
统信桌面专业版【全盘安装UOS系统】介绍  120
银河麒麟桌面操作系统安装佳能打印机驱动方法  112
银河麒麟桌面操作系统 V10-SP1用户密码修改  105
最近下载排行榜
银河麒麟桌面操作系统备份用户数据 0
统信桌面专业版【全盘安装UOS系统】介绍 0
银河麒麟桌面操作系统安装佳能打印机驱动方法 0
银河麒麟桌面操作系统 V10-SP1用户密码修改 0
作者收入月榜
1

prtyaa 收益393.62元

2

zlj141319 收益218元

3

1843880570 收益214.2元

4

IT-feng 收益209.03元

5

风晓 收益208.24元

6

777 收益172.71元

7

Fhawking 收益106.6元

8

信创来了 收益105.84元

9

克里斯蒂亚诺诺 收益91.08元

10

技术-小陈 收益79.5元

请使用微信扫码

加入交流群

请使用微信扫一扫!