Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: saga's tcc-like mode #651

Open
wants to merge 11 commits into
base: feature/saga
Choose a base branch
from
Empty file modified integrate_test.sh
100644 → 100755
Empty file.
2 changes: 2 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package client

import (
"github.com/seata/seata-go/pkg/rm/saga"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

格式不对,挪到第三课 import 里面

"sync"

"github.com/seata/seata-go/pkg/datasource"
Expand Down Expand Up @@ -89,6 +90,7 @@ func initRmClient(cfg *Config) {
tcc.InitTCC()
at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig)
at.InitXA(cfg.ClientConfig.XaConfig)
saga.InitSaga()
})
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/constant/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
RollbackMethod = "sys::rollback"
ActionName = "actionName"

ActionMethod = "sys::action"
CompensationMethod = "sys::compensationAction"

SeataXidKey = "SEATA_XID"
XidKey = "TX_XID"
XidKeyLowercase = "tx_xid"
Expand All @@ -38,5 +41,6 @@ const (

SeataVersion = "1.1.0"

TccBusinessActionContextParameter = "tccParam"
TccBusinessActionContextParameter = "tccParam"
SagaBusinessActionContextParameter = "sagaParam"
)
2 changes: 2 additions & 0 deletions pkg/datasource/sql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (d *seataXADriver) OpenConnector(name string) (c driver.Connector, err erro
}, nil
}

// todo saga driver

type seataDriver struct {
branchType branch.BranchType
transType types.TransactionMode
Expand Down
45 changes: 45 additions & 0 deletions pkg/rm/saga/fence/fence_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package fence

import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

格式太乱了

"context"
"database/sql"
"fmt"
"github.com/seata/seata-go/pkg/rm/saga/fence/handler"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/tm"
)

// WithFence Execute the fence database operation first and then call back the business method
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
if err = DoFence(ctx, tx); err != nil {
return err
}

if err := callback(); err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}

return
}

// DeFence This method is a suspended API interface that asserts the phase timing of a transaction
// and performs corresponding database operations to ensure transaction consistency
// case 1: if fencePhase is FencePhaseNotExist, will return a fence not found error.
// case 2: if fencePhase is FencePhaseAction, will do commit fence operation.
// case 3: if fencePhase is FencePhaseCompensationAction, will do rollback fence operation.
// case 4: if fencePhase not in above case, will return a fence phase illegal error.
func DoFence(ctx context.Context, tx *sql.Tx) error {
hd := handler.GetSagaFenceHandler()
phase := tm.GetFencePhase(ctx)

switch phase {
case enum.FencePhaseNotExist:
return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case enum.FencePhaseAction:
return hd.ActionFence(ctx, tx)
case enum.FencePhaseCompensationAction:
return hd.CompensationFence(ctx, tx)
}

return fmt.Errorf("fence phase: %v illegal", phase)
}
166 changes: 166 additions & 0 deletions pkg/rm/saga/fence/handler/saga_fence_wrapper_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package handler

import (
kaori-seasons marked this conversation as resolved.
Show resolved Hide resolved
"container/list"
"context"
"database/sql"
"fmt"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/dao"
"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
"sync"
"time"
)

var (
fenceHandler *sagaFenceWrapperHandler
fenceOnce sync.Once
)

const (
maxQueueSize = 500
)

type sagaFenceWrapperHandler struct {
tccFenceDao dao.TCCFenceStore
logQueue chan *FenceLogIdentity
logCache list.List
logQueueOnce sync.Once
logQueueCloseOnce sync.Once
}

type FenceLogIdentity struct {
xid string
branchId int64
}

func GetSagaFenceHandler() *sagaFenceWrapperHandler {
if fenceHandler == nil {
fenceOnce.Do(func() {
fenceHandler = &sagaFenceWrapperHandler{
tccFenceDao: dao.GetTccFenceStoreDatabaseMapper(),
}
})
}
return fenceHandler
}

func (handler *sagaFenceWrapperHandler) ActionFence(ctx context.Context, tx *sql.Tx) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId

fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
if err != nil {
return fmt.Errorf(" commit fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}
if fenceDo == nil {
return fmt.Errorf("tcc fence record not exists, commit fence method failed. xid= %s, branchId= %d", xid, branchId)
}

if fenceDo.Status == enum.StatusCommitted {
log.Infof("branch transaction has already committed before. idempotency rejected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return nil
}
if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
// enable warn level
log.Warnf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
}

return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted)
}

func (handler *sagaFenceWrapperHandler) CompensationFence(ctx context.Context, tx *sql.Tx) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName
fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
if err != nil {
return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}

// record is null, mean the need suspend
if fenceDo == nil {
err = handler.insertSagaFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended)
if err != nil {
return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}
log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId)
return nil
}

// have rollbacked or suspended
if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
// enable warn level
log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
return nil
}
if fenceDo.Status == enum.StatusCommitted {
log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
}

return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked)
}

func (handler *sagaFenceWrapperHandler) insertSagaFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error {
tccFenceDo := model.TCCFenceDO{
Xid: xid,
BranchId: branchId,
ActionName: actionName,
Status: status,
}
return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo)
}

func (handler *sagaFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string, branchId int64, status enum.FenceStatus) error {
return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status)
}

func (handler *sagaFenceWrapperHandler) InitLogCleanChannel() {
handler.logQueueOnce.Do(func() {
go handler.traversalCleanChannel()
})
}

func (handler *sagaFenceWrapperHandler) DestroyLogCleanChannel() {
handler.logQueueCloseOnce.Do(func() {
close(handler.logQueue)
})
}

func (handler *sagaFenceWrapperHandler) deleteFence(xid string, id int64) error {
// todo implement
return nil
}

func (handler *sagaFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 {
// todo implement
return 0
}

func (handler *sagaFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) {
// todo implement
fli := &FenceLogIdentity{
xid: xid,
branchId: branchId,
}
select {
case handler.logQueue <- fli:
// todo add batch delete from log cache.
default:
handler.logCache.PushBack(fli)
}
log.Infof("add one log to clean queue: %v ", fli)
}

func (handler *sagaFenceWrapperHandler) traversalCleanChannel() {
handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize)
for li := range handler.logQueue {
if err := handler.deleteFence(li.xid, li.branchId); err != nil {
log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId)
}
}
}
1 change: 1 addition & 0 deletions pkg/rm/saga/saga_driver_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package saga
Loading
Loading