|
[ | ]
|
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request) |
|
{ |
|
var msg = $"用户{request.UserId}转出{request.Amount}元"; |
|
_logger.LogInformation($"转出子事务-启动:{msg}"); |
|
// 1. 创建子事务屏障 |
|
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
|
try |
|
{ |
|
using (var conn = _context.Database.GetDbConnection()) |
|
{ |
|
// 2. 在子事务屏障内执行事务操作 |
|
await branchBarrier.Call(conn, async (tx) => |
|
{ |
|
_logger.LogInformation($"转出子事务-执行:{msg}"); |
|
await _context.Database.UseTransactionAsync(tx); |
|
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
|
if (bankAccount == null || bankAccount.Balance < request.Amount) |
|
throw new InvalidDataException("账户不存在或余额不足!"); |
|
bankAccount.Balance -= request.Amount; |
|
await _context.SaveChangesAsync(); |
|
}); |
|
} |
|
} |
|
catch (InvalidDataException ex) |
|
{ |
|
_logger.LogInformation($"转出子事务-失败:{ex.Message}"); |
|
// 3. 按照接口协议,返回409,以表示子事务失败 |
|
return new StatusCodeResult(StatusCodes.Status409Conflict); |
|
} |
|
_logger.LogInformation($"转出子事务-成功:{msg}"); |
|
return Ok(); |
|
} |
以上代码中有几点需要额外注意:
_barrierFactory.CreateBranchBarrier(Request.Query)
,其中Request.Query
中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga
,主要包含四个参数:
branchBarrier.Call(conn, async (tx) =>{}
**409**
状态码以告知DTM 子事务失败。409
状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception)
,如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:
|
[ | ]
|
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request) |
|
{ |
|
var msg = $"用户{request.UserId}回滚转出{request.Amount}元"; |
|
_logger.LogInformation($"转出补偿子事务-启动:{msg}"); |
|
// 1. 创建子事务屏障 |
|
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
|
using (var conn = _context.Database.GetDbConnection()) |
|
{ |
|
// 在子事务屏障内执行事务操作 |
|
await branchBarrier.Call(conn, async (tx) => |
|
{ |
|
_logger.LogInformation($"转出补偿子事务-执行:{msg}"); |
|
await _context.Database.UseTransactionAsync(tx); |
|
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
|
if (bankAccount == null) |
|
return; //对于补偿操作,可直接返回,中断后续操作 |
|
bankAccount.Balance += request.Amount; |
|
await _context.SaveChangesAsync(); |
|
}); |
|
} |
|
_logger.LogInformation($"转出补偿子事务-成功!"); |
|
// 2. 因补偿操作必须成功,所以必须返回200。 |
|
return Ok(); |
|
} |
由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。
另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**
。因此当出现bankAccount==null
时可以直接 return。
转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}
中实现事务逻辑,并通过抛异常的方式并最终返回409
状态码来显式告知DTM 子事务执行失败。
|
[ | ]
|
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request) |
|
{ |
|
var msg = $"用户{request.UserId}转入{request.Amount}元"; |
|
_logger.LogInformation($"转入子事务-启动:{msg}"); |
|
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
|
try |
|
{ |
|
using (var conn = _context.Database.GetDbConnection()) |
|
{ |
|
await branchBarrier.Call(conn, async (tx) => |
|
{ |
|
_logger.LogInformation($"转入子事务-执行:{msg}"); |
|
await _context.Database.UseTransactionAsync(tx); |
|
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
|
if (bankAccount == null) |
|
throw new InvalidDataException("账户不存在!"); |
|
bankAccount.Balance += request.Amount; |
|
await _context.SaveChangesAsync(); |
|
}); |
|
} |
|
} |
|
catch (InvalidDataException ex) |
|
{ |
|
_logger.LogInformation($"转入子事务-失败:{ex.Message}"); |
|
return new StatusCodeResult(StatusCodes.Status409Conflict); |
|
} |
|
_logger.LogInformation($"转入子事务-成功:{msg}"); |
|
return Ok(); |
|
} |
转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}
中实现事务逻辑,并最终返回200
状态码来告知DTM 补偿子事务执行成功。
|
[ | ]
|
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request) |
|
{ |
|
var msg = "用户{request.UserId}回滚转入{request.Amount}元"; |
|
_logger.LogInformation($"转入补偿子事务-启动:{msg}"); |
|
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
|
using (var conn = _context.Database.GetDbConnection()) |
|
{ |
|
await branchBarrier.Call(conn, async (tx) => |
|
{ |
|
_logger.LogInformation($"转入补偿子事务-执行:{msg}"); |
|
await _context.Database.UseTransactionAsync(tx); |
|
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
|
if (bankAccount == null) return; |
|
bankAccount.Balance -= request.Amount; |
|
await _context.SaveChangesAsync(); |
|
}); |
|
} |
|
_logger.LogInformation($"转入补偿子事务-成功!"); |
|
return Ok(); |
|
} |
拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:
|
[ | ]
|
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount, |
|
CancellationToken cancellationToken) |
|
{ |
|
try |
|
{ |
|
_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}"); |
|
//1. 生成全局事务ID |
|
var gid = await _dtmClient.GenGid(cancellationToken); |
|
var bizUrl = _configuration.GetValue<string>("TransferBaseURL"); |
|
//2. 创建Saga |
|
var saga = _transFactory.NewSaga(gid); |
|
//3. 添加子事务 |
|
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate", |
|
new TransferRequest(fromUserId, amount)) |
|
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate", |
|
new TransferRequest(toUserId, amount)) |
|
.EnableWaitResult(); // 4. 按需启用是否等待事务执行结果 |
|
|
|
//5. 提交Saga事务 |
|
await saga.Submit(cancellationToken); |
|
} |
|
catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。 |
|
{ |
|
_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!"); |
|
return new BadRequestObjectResult($"转账失败:{ex.Message}"); |
|
} |
|
|
|
_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!"); |
|
return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!"); |
|
} |
主要步骤如下:
var gid =await _dtmClient.GenGid(cancellationToken);
_transFactory.NewSaga(gid);
saga.Add(string action, string compensate, object postData);
包含正向和反向子事务。EnableWaitResult()
开启事务结果等待。saga.Submit(cancellationToken);
try...catch..
来捕获DtmExcepiton
异常来获取事务执行异常信息。既然DTM作为一个独立的服务存在,其负责通过HTTP
或gRPC
协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux
即可添加Dockerfile
如下所示:
|
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base |
|
WORKDIR /app |
|
EXPOSE 80 |
|
EXPOSE 443 |
|
|
|
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build |
|
WORKDIR /src |
|
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"] |
|
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj" |
|
COPY . . |
|
WORKDIR "/src/DtmDemo.WebApi" |
|
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build |
|
|
|
FROM build AS publish |
|
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish |
|
|
|
FROM base AS final |
|
WORKDIR /app |
|
COPY --from=publish /app/publish . |
|
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"] |
在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose
即可添加docker-compose.yml
,由于整个项目依赖mysql
和DTM
,修改docker-compose.yml
如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。
|
version: '3.4' |
|
services: |
|
db: |
|
image: 'mysql:5.7' |
|
container_name: dtm-mysql |
|
environment: |
|
MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码 |
|
volumes: |
|
- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 挂载用于初始化数据库的脚本 |
|
ports: |
|
- '3306:3306' |
|
dtm: |
|
depends_on: ["db"] |
|
image: 'yedf/dtm:latest' |
|
container_name: dtm-svc |
|
environment: |
|
IS_DOCKER: '1' |
|
STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事务数据 |
|
STORE_HOST: db # 指定MySQL服务名,这里是db |
|
STORE_USER: root |
|
STORE_PASSWORD: '123456' |
|
STORE_PORT: 3306 |
|
STORE_DB: "dtm" # 指定DTM 数据库名 |
|
ports: |
|
- '36789:36789' # DTM HTTP 端口 |
|
- '36790:36790' # DTM gRPC 端口 |
|
dtmdemo.webapi: |
|
depends_on: ["dtm", "db"] |
|
image: ${DOCKER_REGISTRY-}dtmdemowebapi |
|
environment: |
|
ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为docker |
|
container_name: dtm-webapi-demo |
|
build: |
|
context: . |
|
dockerfile: DtmDemo.WebApi/Dockerfile |
|
ports: |
|
- '31293:80' # 映射Demo:80端口到本地31293端口 |
|
- '31294:443' # 映射Demo:443端口到本地31294端口 |
|
其中dtmdemo.webapi
服务通过ASPNETCORE_ENVIRONMENT: docker
指定启动环境为docker
,因此需要在项目下添加appsettings.docker.json
以配置应用参数:
|
{ |
|
"ConnectionStrings": { |
|
"DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true" |
|
}, |
|
"TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo", |
|
"dtm": { |
|
"DtmUrl": "http://dtm:36789", |
|
"DtmTimeout": 10000, |
|
"BranchTimeout": 10000, |
|
"DBType": "mysql", |
|
"BarrierTableName": "dtm_barrier.barrier" |
|
} |
|
} |
|
另外db
服务中通过volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]
来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库dtm
和示例项目使用子事务屏障需要的barrier
数据表。脚本如下:
|
CREATE DATABASE IF NOT EXISTS dtm |
|
/*!40100 DEFAULT CHARACTER SET utf8mb4 */ |
|
; |
|
drop table IF EXISTS dtm.trans_global; |
|
CREATE TABLE if not EXISTS dtm.trans_global ( |
|
`id` bigint(22) NOT NULL AUTO_INCREMENT, |
|
`gid` varchar(128) NOT NULL COMMENT 'global transaction id', |
|
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg', |
|
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked', |
|
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow', |
|
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc', |
|
`create_time` datetime DEFAULT NULL, |
|
`update_time` datetime DEFAULT NULL, |
|
`finish_time` datetime DEFAULT NULL, |
|
`rollback_time` datetime DEFAULT NULL, |
|
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout', |
|
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction', |
|
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job', |
|
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job', |
|
`owner` varchar(128) not null default '' comment 'who is locking this trans', |
|
`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern', |
|
`result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction', |
|
`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction', |
|
PRIMARY KEY (`id`), |
|
UNIQUE KEY `gid` (`gid`), |
|
key `owner`(`owner`), |
|
key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans' |
|
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; |
|
drop table IF EXISTS dtm.trans_branch_op; |
|
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op ( |
|
`id` bigint(22) NOT NULL AUTO_INCREMENT, |
|
`gid` varchar(128) NOT NULL COMMENT 'global transaction id', |
|
`url` varchar(1024) NOT NULL COMMENT 'the url of this op', |
|
`data` TEXT COMMENT 'request body, depreceated', |
|
`bin_data` BLOB COMMENT 'request body', |
|
`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID', |
|
`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel', |
|
`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed', |
|
`finish_time` datetime DEFAULT NULL, |
|
`rollback_time` datetime DEFAULT NULL, |
|
`create_time` datetime DEFAULT NULL, |
|
`update_time` datetime DEFAULT NULL, |
|
PRIMARY KEY (`id`), |
|
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`) |
|
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; |
|
drop table IF EXISTS dtm.kv; |
|
CREATE TABLE IF NOT EXISTS dtm.kv ( |
|
`id` bigint(22) NOT NULL AUTO_INCREMENT, |
|
`cat` varchar(45) NOT NULL COMMENT 'the category of this data', |
|
`k` varchar(128) NOT NULL, |
|
`v` TEXT, |
|
`version` bigint(22) default 1 COMMENT 'version of the value', |
|
create_time datetime default NULL, |
|
update_time datetime DEFAULT NULL, |
|
PRIMARY KEY (`id`), |
|
UNIQUE key `uniq_k`(`cat`, `k`) |
|
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; |
|
|
create database if not exists dtm_barrier |
|
/*!40100 DEFAULT CHARACTER SET utf8mb4 */ |
|
; |
|
drop table if exists dtm_barrier.barrier; |
|
create table if not exists dtm_barrier.barrier( |
|
id bigint(22) PRIMARY KEY AUTO_INCREMENT, |
|
trans_type varchar(45) default '', |
|
gid varchar(128) default '', |
|
branch_id varchar(128) default '', |
|
op varchar(45) default '', |
|
barrier_id varchar(45) default '', |
|
reason varchar(45) default '' comment 'the branch type who insert this record', |
|
create_time datetime DEFAULT now(), |
|
update_time datetime DEFAULT now(), |
|
key(create_time), |
|
key(update_time), |
|
UNIQUE key(gid, branch_id, op, barrier_id) |
|
); |
如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!
加入交流群
请使用微信扫一扫!