diff --git a/integrate_test.sh b/integrate_test.sh old mode 100644 new mode 100755 diff --git a/pkg/client/client.go b/pkg/client/client.go index 54a56888b..b1107eef2 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,6 +18,7 @@ package client import ( + "github.com/seata/seata-go/pkg/rm/saga" "sync" "github.com/seata/seata-go/pkg/datasource" @@ -89,6 +90,7 @@ func initRmClient(cfg *Config) { tcc.InitTCC() at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig) at.InitXA(cfg.ClientConfig.XaConfig) + saga.InitSaga() }) } diff --git a/pkg/constant/context.go b/pkg/constant/context.go index 5dd679c9b..67ec039f2 100644 --- a/pkg/constant/context.go +++ b/pkg/constant/context.go @@ -27,6 +27,9 @@ const ( RollbackMethod = "sys::rollback" ActionName = "actionName" + ActionMethod = "sys::action" + CompensationMethod = "sys::compensationAction" + SeataXidKey = "SEATA_XID" XidKey = "TX_XID" XidKeyLowercase = "tx_xid" @@ -38,5 +41,6 @@ const ( SeataVersion = "1.1.0" - TccBusinessActionContextParameter = "tccParam" + TccBusinessActionContextParameter = "tccParam" + SagaBusinessActionContextParameter = "sagaParam" ) diff --git a/pkg/datasource/sql/driver.go b/pkg/datasource/sql/driver.go index 165cead44..50e210557 100644 --- a/pkg/datasource/sql/driver.go +++ b/pkg/datasource/sql/driver.go @@ -102,6 +102,8 @@ func (d *seataXADriver) OpenConnector(name string) (c driver.Connector, err erro }, nil } +// todo saga driver + type seataDriver struct { branchType branch.BranchType transType types.TransactionMode diff --git a/pkg/rm/saga/fence/fence_api.go b/pkg/rm/saga/fence/fence_api.go new file mode 100644 index 000000000..231bb2b70 --- /dev/null +++ b/pkg/rm/saga/fence/fence_api.go @@ -0,0 +1,45 @@ +package fence + +import ( + "context" + "database/sql" + "fmt" + "github.com/seata/seata-go/pkg/rm/saga/fence/handler" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/tm" +) + +// WithFence Execute the fence database operation first and then call back the business method +func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) { + if err = DoFence(ctx, tx); err != nil { + return err + } + + if err := callback(); err != nil { + return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err) + } + + return +} + +// DeFence This method is a suspended API interface that asserts the phase timing of a transaction +// and performs corresponding database operations to ensure transaction consistency +// case 1: if fencePhase is FencePhaseNotExist, will return a fence not found error. +// case 2: if fencePhase is FencePhaseAction, will do commit fence operation. +// case 3: if fencePhase is FencePhaseCompensationAction, will do rollback fence operation. +// case 4: if fencePhase not in above case, will return a fence phase illegal error. +func DoFence(ctx context.Context, tx *sql.Tx) error { + hd := handler.GetSagaFenceHandler() + phase := tm.GetFencePhase(ctx) + + switch phase { + case enum.FencePhaseNotExist: + return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)) + case enum.FencePhaseAction: + return hd.ActionFence(ctx, tx) + case enum.FencePhaseCompensationAction: + return hd.CompensationFence(ctx, tx) + } + + return fmt.Errorf("fence phase: %v illegal", phase) +} diff --git a/pkg/rm/saga/fence/handler/saga_fence_wrapper_handler.go b/pkg/rm/saga/fence/handler/saga_fence_wrapper_handler.go new file mode 100644 index 000000000..9c52c7890 --- /dev/null +++ b/pkg/rm/saga/fence/handler/saga_fence_wrapper_handler.go @@ -0,0 +1,166 @@ +package handler + +import ( + "container/list" + "context" + "database/sql" + "fmt" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/dao" + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/pkg/util/log" + "sync" + "time" +) + +var ( + fenceHandler *sagaFenceWrapperHandler + fenceOnce sync.Once +) + +const ( + maxQueueSize = 500 +) + +type sagaFenceWrapperHandler struct { + tccFenceDao dao.TCCFenceStore + logQueue chan *FenceLogIdentity + logCache list.List + logQueueOnce sync.Once + logQueueCloseOnce sync.Once +} + +type FenceLogIdentity struct { + xid string + branchId int64 +} + +func GetSagaFenceHandler() *sagaFenceWrapperHandler { + if fenceHandler == nil { + fenceOnce.Do(func() { + fenceHandler = &sagaFenceWrapperHandler{ + tccFenceDao: dao.GetTccFenceStoreDatabaseMapper(), + } + }) + } + return fenceHandler +} + +func (handler *sagaFenceWrapperHandler) ActionFence(ctx context.Context, tx *sql.Tx) error { + xid := tm.GetBusinessActionContext(ctx).Xid + branchId := tm.GetBusinessActionContext(ctx).BranchId + + fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) + if err != nil { + return fmt.Errorf(" commit fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + } + if fenceDo == nil { + return fmt.Errorf("tcc fence record not exists, commit fence method failed. xid= %s, branchId= %d", xid, branchId) + } + + if fenceDo.Status == enum.StatusCommitted { + log.Infof("branch transaction has already committed before. idempotency rejected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + return nil + } + if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { + // enable warn level + log.Warnf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) + return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + } + + return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted) +} + +func (handler *sagaFenceWrapperHandler) CompensationFence(ctx context.Context, tx *sql.Tx) error { + xid := tm.GetBusinessActionContext(ctx).Xid + branchId := tm.GetBusinessActionContext(ctx).BranchId + actionName := tm.GetBusinessActionContext(ctx).ActionName + fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) + if err != nil { + return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + } + + // record is null, mean the need suspend + if fenceDo == nil { + err = handler.insertSagaFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended) + if err != nil { + return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + } + log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId) + return nil + } + + // have rollbacked or suspended + if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { + // enable warn level + log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) + return nil + } + if fenceDo.Status == enum.StatusCommitted { + log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + } + + return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked) +} + +func (handler *sagaFenceWrapperHandler) insertSagaFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error { + tccFenceDo := model.TCCFenceDO{ + Xid: xid, + BranchId: branchId, + ActionName: actionName, + Status: status, + } + return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo) +} + +func (handler *sagaFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string, branchId int64, status enum.FenceStatus) error { + return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status) +} + +func (handler *sagaFenceWrapperHandler) InitLogCleanChannel() { + handler.logQueueOnce.Do(func() { + go handler.traversalCleanChannel() + }) +} + +func (handler *sagaFenceWrapperHandler) DestroyLogCleanChannel() { + handler.logQueueCloseOnce.Do(func() { + close(handler.logQueue) + }) +} + +func (handler *sagaFenceWrapperHandler) deleteFence(xid string, id int64) error { + // todo implement + return nil +} + +func (handler *sagaFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 { + // todo implement + return 0 +} + +func (handler *sagaFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) { + // todo implement + fli := &FenceLogIdentity{ + xid: xid, + branchId: branchId, + } + select { + case handler.logQueue <- fli: + // todo add batch delete from log cache. + default: + handler.logCache.PushBack(fli) + } + log.Infof("add one log to clean queue: %v ", fli) +} + +func (handler *sagaFenceWrapperHandler) traversalCleanChannel() { + handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize) + for li := range handler.logQueue { + if err := handler.deleteFence(li.xid, li.branchId); err != nil { + log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId) + } + } +} diff --git a/pkg/rm/saga/saga_driver_conn.go b/pkg/rm/saga/saga_driver_conn.go new file mode 100644 index 000000000..4def0c597 --- /dev/null +++ b/pkg/rm/saga/saga_driver_conn.go @@ -0,0 +1 @@ +package saga diff --git a/pkg/rm/saga/saga_resource.go b/pkg/rm/saga/saga_resource.go new file mode 100644 index 000000000..93fd4bd28 --- /dev/null +++ b/pkg/rm/saga/saga_resource.go @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package saga + +import ( + "context" + "encoding/json" + "fmt" + "github.com/seata/seata-go/pkg/constant" + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/rm" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/pkg/util/log" + "reflect" + "sync" +) + +var ( + sagaResourceManager *SagaResourceManager + onceSagaReosurceManager = &sync.Once{} +) + +func InitSaga() { + + rm.GetRmCacheInstance().RegisterResourceManager(GetSagaResourceManagerInstance()) +} + +func GetSagaResourceManagerInstance() *SagaResourceManager { + if sagaResourceManager == nil { + onceSagaReosurceManager.Do(func() { + + sagaResourceManager = &SagaResourceManager{ + resourceManagerMap: sync.Map{}, + rmRemoting: rm.GetRMRemotingInstance(), + } + }) + } + return sagaResourceManager +} + +type SagaResource struct { + ResourceGroupId string `default:"DEFAULT"` + AppName string + actionName string + //一阶段提交的方法 + prepareMethod reflect.Method + //与tcc事务不同saga事务只有回滚才需要二阶段 + rollbackMethod reflect.Method + rollbackArgsClasses reflect.Kind + *rm.SagaAction +} + +func ParseSagaResource(v interface{}) (*SagaResource, error) { + t, err := rm.ParseSagaTwoPhaseAction(v) + if err != nil { + log.Errorf("%#v is not tcc two phase service, %s", v, err.Error()) + return nil, err + } + return &SagaResource{ + // todo read from config + ResourceGroupId: `default:"DEFAULT"`, + AppName: "seata-go-mock-app-name", + SagaAction: t, + }, nil +} + +func (t *SagaResource) GetResourceId() string { + return t.SagaAction.GetSagaActionName() +} + +func (t *SagaResource) GetBranchType() branch.BranchType { + return branch.BranchTypeTCC +} + +func (saga *SagaResource) GetResourceGroupId() string { + return saga.ResourceGroupId +} + +func (saga *SagaResource) GetAppName() string { + return saga.AppName +} + +func (saga *SagaResource) ActionName() string { + return saga.actionName +} + +func (saga *SagaResource) SetActionName(actionName string) { + saga.actionName = actionName +} + +func (saga *SagaResource) PrepareMethod() reflect.Method { + return saga.prepareMethod +} + +func (saga *SagaResource) SetPrepareMethod(prepareMethod reflect.Method) { + saga.prepareMethod = prepareMethod +} + +func (saga *SagaResource) RollbackMethod() reflect.Method { + return saga.rollbackMethod +} + +func (saga *SagaResource) SetRollbackMethod(rollbackMethod reflect.Method) { + saga.rollbackMethod = rollbackMethod +} + +func (saga *SagaResource) RollbackArgsClasses() reflect.Kind { + return saga.rollbackArgsClasses +} + +func (saga *SagaResource) SetRollbackArgsClasses(rollbackArgsClasses reflect.Kind) { + saga.rollbackArgsClasses = rollbackArgsClasses +} + +//需要单独抽象一套出来 但是没法沿用tcc的接口 +type SagaResourceManager struct { + rmRemoting *rm.RMRemoting + resourceManagerMap sync.Map +} + +func (saga *SagaResourceManager) getBusinessAction(xid string, branchID int64, resourceID string, applicationData []byte) *tm.BusinessActionContext { + actionMap := make(map[string]interface{}, 2) + if len(applicationData) > 0 { + var sagaContext map[string]interface{} + if err := json.Unmarshal(applicationData, &sagaContext); err != nil { + fmt.Errorf("application data failed load %s", applicationData) + } + if v, ok := sagaContext[constant.ActionContext]; ok { + actionMap = v.(map[string]interface{}) + } + } + + return &tm.BusinessActionContext{ + Xid: xid, + BranchId: branchID, + ActionName: resourceID, + ActionContext: actionMap, + } +} + +func (saga *SagaResourceManager) BranchCommit(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) { + var sagaReSource *SagaResource + if resource, ok := saga.resourceManagerMap.Load(branchResource.ResourceId); !ok { + err := fmt.Errorf("TCC Resource is not exist, resourceId: %s", branchResource.ResourceId) + return 0, err + } else { + sagaReSource, _ = resource.(*SagaResource) + } + + //需要拿到此时action的名称 + businessContext := saga.getBusinessAction(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData) + //携带事务回滚,提交的状态 + ctx = tm.InitSeataContext(ctx) + tm.SetXID(ctx, branchResource.Xid) + //设置回滚事务时,防悬挂 + tm.SetFencePhase(ctx, enum.FencePhaseAction) + tm.SetBusinessActionContext(ctx, businessContext) + //getBusinessActionContext + //一阶段直接提交 + _, err := sagaReSource.SagaAction.Action(ctx, businessContext) + + return branch.BranchStatusPhasetwoCommitted, err +} + +func (saga *SagaResourceManager) BranchRollback(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) { + + var sagaResource *SagaResource + if resource, ok := saga.resourceManagerMap.Load(resource.ResourceId); !ok { + err := fmt.Errorf("xxxx") + return 0, err + } else { + sagaResource, _ = resource.(*SagaResource) + } + + businessActionContext := saga.getBusinessAction(resource.Xid, resource.BranchId, resource.ResourceId, resource.ApplicationData) + + ctx = tm.InitSeataContext(ctx) + tm.SetXID(ctx, resource.Xid) + //设置回滚事务时,防悬挂 + tm.SetFencePhase(ctx, enum.FencePhaseCompensationAction) + tm.SetBusinessActionContext(ctx, businessActionContext) + _, err := sagaResource.SagaAction.Compensation(ctx, businessActionContext) + return branch.BranchStatusPhasetwoRollbackFailedRetryable, err +} + +func (saga *SagaResourceManager) BranchRegister(ctx context.Context, param rm.BranchRegisterParam) (int64, error) { + return saga.rmRemoting.BranchRegister(param) +} + +// BranchReport branch transaction report the status +func (saga *SagaResourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error { + return nil +} + +// LockQuery lock query boolean +func (saga *SagaResourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) { + return false, nil +} + +// GetCachedResources get all resources managed by this manager +func (saga *SagaResourceManager) GetCachedResources() *sync.Map { + return nil +} + +// RegisterResource register a Resource to be managed by Resource Manager +func (saga *SagaResourceManager) RegisterResource(res rm.Resource) error { + return nil +} + +// UnregisterResource unregister a Resource from the Resource Manager +func (saga *SagaResourceManager) UnregisterResource(res rm.Resource) error { + return nil +} + +func (saga *SagaResourceManager) GetBranchType() branch.BranchType { + return branch.BranchTypeSAGA +} diff --git a/pkg/rm/saga/saga_service.go b/pkg/rm/saga/saga_service.go new file mode 100644 index 000000000..c5fbf8d16 --- /dev/null +++ b/pkg/rm/saga/saga_service.go @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package saga + +import ( + "context" + "encoding/json" + "fmt" + gostnet "github.com/dubbogo/gost/net" + "github.com/seata/seata-go/pkg/constant" + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/rm" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/pkg/util/log" + "reflect" + "sync" + "time" +) + +type SagaServiceProxy struct { + referenceName string + registerResourceOnce sync.Once + *SagaResource +} + +func NewSagaServiceProxy(service interface{}) (*SagaServiceProxy, error) { + sagaResource, err := ParseSagaResource(service) + if err != nil { + log.Errorf("invalid saga service, err %v", err) + } + proxy := &SagaServiceProxy{ + SagaResource: sagaResource, + } + return proxy, proxy.RegisterResource() +} + +func (proxy *SagaServiceProxy) RegisterResource() error { + var err error + proxy.registerResourceOnce.Do(func() { + err = rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeSAGA).RegisterResource(proxy.SagaResource) + if err != nil { + log.Errorf("NewSagaServiceProxy RegisterResource error: %#v", err.Error()) + } + }) + return err +} + +func (proxy *SagaServiceProxy) initActionContext(params interface{}) map[string]interface{} { + actionContext := proxy.getActionContextParameters(params) + actionContext[constant.ActionStartTime] = time.Now().UnixNano() / 1e6 + actionContext[constant.ActionMethod] = proxy.SagaResource.SagaAction.GetNormalActionName() + actionContext[constant.CompensationMethod] = proxy.SagaResource.SagaAction.GetCompensationName() + actionContext[constant.ActionName] = proxy.SagaResource.SagaAction.GetActionName(params) + actionContext[constant.HostName], _ = gostnet.GetLocalIP() + return actionContext +} + +// getOrCreateBusinessActionContext When the parameters of the prepare method are the following scenarios, obtain the context in the following ways: +// 1. null: create new BusinessActionContext +// 2. tm.BusinessActionContext: return it +// 3. *tm.BusinessActionContext: if nil then create new BusinessActionContext, else return it +// 4. Struct: if there is an attribute of businessactioncontext enum and it is not nil, return it +// 5. else: create new BusinessActionContext +func (proxy *SagaServiceProxy) getOrCreateBusinessActionContext(params interface{}) *tm.BusinessActionContext { + if params == nil { + return &tm.BusinessActionContext{} + } + + switch params.(type) { + case tm.BusinessActionContext: + v := params.(tm.BusinessActionContext) + return &v + case *tm.BusinessActionContext: + v := params.(*tm.BusinessActionContext) + if v != nil { + return v + } + return &tm.BusinessActionContext{} + default: + break + } + + var ( + typ reflect.Type + val reflect.Value + isStruct bool + ) + if isStruct, val, typ = obtainStructValueType(params); !isStruct { + return &tm.BusinessActionContext{} + } + n := typ.NumField() + for i := 0; i < n; i++ { + sf := typ.Field(i) + if sf.Type == rm.TypBusinessContextInterface { + v := val.Field(i).Interface() + if v != nil { + return v.(*tm.BusinessActionContext) + } + } + if sf.Type == reflect.TypeOf(tm.BusinessActionContext{}) && val.Field(i).CanInterface() { + v := val.Field(i).Interface().(tm.BusinessActionContext) + return &v + } + } + return &tm.BusinessActionContext{} +} + +func (proxy *SagaServiceProxy) registerBranch(ctx context.Context, params interface{}) error { + if !tm.IsGlobalTx(ctx) { + errStr := "BranchRegister error, transaction should be opened" + log.Errorf(errStr) + return fmt.Errorf(errStr) + } + + sagaContext := proxy.initBusinessActionContext(ctx, params) + actionContext := proxy.initActionContext(params) + for k, v := range actionContext { + sagaContext.ActionContext[k] = v + } + + applicationData, _ := json.Marshal(map[string]interface{}{ + constant.ActionContext: actionContext, + }) + branchId, err := rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{ + BranchType: branch.BranchTypeSAGA, + ResourceId: proxy.GetSagaActionName(), + ClientId: "", + Xid: tm.GetXID(ctx), + ApplicationData: string(applicationData), + LockKeys: "", + }) + if err != nil { + log.Errorf("register branch transaction error %s ", err.Error()) + return err + } + sagaContext.BranchId = branchId + tm.SetBusinessActionContext(ctx, sagaContext) + return nil +} + +func (proxy *SagaServiceProxy) getActionContextParameters(params interface{}) map[string]interface{} { + var ( + actionContext = make(map[string]interface{}, 0) + typ reflect.Type + val reflect.Value + isStruct bool + ) + if params == nil { + return actionContext + } + if isStruct, val, typ = obtainStructValueType(params); !isStruct { + return actionContext + } + for i := 0; i < typ.NumField(); i++ { + // skip unexported anonymous filed + if typ.Field(i).PkgPath != "" { + continue + } + structField := typ.Field(i) + // skip ignored field + tagVal, hasTag := structField.Tag.Lookup(constant.SagaBusinessActionContextParameter) + if !hasTag || tagVal == `-` || tagVal == "" { + continue + } + actionContext[tagVal] = val.Field(i).Interface() + } + return actionContext +} + +// initBusinessActionContext init saga context +func (t *SagaServiceProxy) initBusinessActionContext(ctx context.Context, params interface{}) *tm.BusinessActionContext { + sagaContext := t.getOrCreateBusinessActionContext(params) + sagaContext.Xid = tm.GetXID(ctx) + sagaContext.ActionName = t.GetSagaActionName() + // todo read from config file + sagaContext.IsDelayReport = true + if sagaContext.ActionContext == nil { + sagaContext.ActionContext = make(map[string]interface{}, 0) + } + return sagaContext +} + +// obtainStructValueType check o is struct or pointer enum +func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) { + v := reflect.ValueOf(o) + t := reflect.TypeOf(o) + switch v.Kind() { + case reflect.Struct: + return true, v, t + case reflect.Ptr: + return true, v.Elem(), t.Elem() + default: + return false, v, nil + } +} + +func (proxy *SagaServiceProxy) GetTransactionInfo() tm.GtxConfig { + // todo replace with config + return tm.GtxConfig{ + Timeout: time.Second * 10, + Name: proxy.GetSagaActionName(), + // Propagation, Propagation + // LockRetryInternal, int64 + // LockRetryTimes int64 + } +} diff --git a/pkg/rm/saga/saga_service_test.go b/pkg/rm/saga/saga_service_test.go new file mode 100644 index 000000000..169f9181f --- /dev/null +++ b/pkg/rm/saga/saga_service_test.go @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package saga + +import ( + "context" + "fmt" + "github.com/agiledragon/gomonkey" + gostnet "github.com/dubbogo/gost/net" + "github.com/seata/seata-go/pkg/constant" + "github.com/seata/seata-go/pkg/rm" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/pkg/util/log" + testdata2 "github.com/seata/seata-go/testdata" + "github.com/stretchr/testify/assert" + "os" + "reflect" + "sync" + "testing" + "time" +) + +var ( + testSagaServiceProxy *SagaServiceProxy + testBranchID = int64(121324345353) + names []interface{} + values = make([]reflect.Value, 0, 2) +) + +type UserProvider struct { + Action func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"action"` + Compensation func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"compensation"` +} + +func InitMock() { + log.Init() + var ( + registerResource = func(_ *SagaServiceProxy) error { + return nil + } + branchRegister = func(_ *rm.RMRemoting, param rm.BranchRegisterParam) (int64, error) { + return testBranchID, nil + } + ) + log.Infof("run init mock") + gomonkey.ApplyMethod(reflect.TypeOf(testSagaServiceProxy), "RegisterResource", registerResource) + gomonkey.ApplyMethod(reflect.TypeOf(rm.GetRMRemotingInstance()), "BranchRegister", branchRegister) + testSagaServiceProxy, _ = NewSagaServiceProxy(GetTestTwoPhaseService()) +} + +func TestMain(m *testing.M) { + InitMock() + code := m.Run() + os.Exit(code) +} + +func TestInitActionContext(t *testing.T) { + param := struct { + name string `sagaParam:"name"` + Age int64 `sagaParam:""` + Addr string `sagaParam:"addr"` + Job string `sagaParam:"-"` + Class string + Other []int8 `sagaParam:"Other"` + }{ + name: "Jack", + Age: 20, + Addr: "Earth", + Job: "Dor", + Class: "1-2", + Other: []int8{1, 2, 3}, + } + + now := time.Now() + p := gomonkey.ApplyFunc(time.Now, func() time.Time { + return now + }) + defer p.Reset() + result := testSagaServiceProxy.initActionContext(param) + localIp, _ := gostnet.GetLocalIP() + assert.Equal(t, map[string]interface{}{ + "addr": "Earth", + "Other": []int8{1, 2, 3}, + constant.ActionStartTime: now.UnixNano() / 1e6, + constant.ActionMethod: "Action", + constant.CompensationMethod: "Compensation", + constant.ActionName: testdata2.ActionName, + constant.HostName: localIp, + }, result) +} + +func TestGetOrCreateBusinessActionContext(t *testing.T) { + tests := []struct { + param interface{} + want tm.BusinessActionContext + }{ + { + param: nil, + want: tm.BusinessActionContext{}, + }, + { + param: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 12, + }, + }, + want: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 12, + }, + }, + }, + { + param: &tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 13, + }, + }, + want: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 13, + }, + }, + }, + { + param: struct { + Context *tm.BusinessActionContext + }{ + Context: &tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 14, + }, + }, + }, + want: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 14, + }, + }, + }, + { + param: struct { + Context tm.BusinessActionContext + }{ + Context: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 12, + }, + }, + }, + want: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 12, + }, + }, + }, + { + param: struct { + context tm.BusinessActionContext + }{ + context: tm.BusinessActionContext{ + ActionContext: map[string]interface{}{ + "name": "Jack", + "age": 12, + }, + }, + }, + want: tm.BusinessActionContext{}, + }, + } + + for _, tt := range tests { + result := testSagaServiceProxy.getOrCreateBusinessActionContext(tt.param) + assert.Equal(t, tt.want, *result) + } +} + +func TestRegisteBranch(t *testing.T) { + ctx := testdata2.GetTestContext() + err := testSagaServiceProxy.registerBranch(ctx, nil) + assert.Nil(t, err) + bizContext := tm.GetBusinessActionContext(ctx) + assert.Equal(t, testBranchID, bizContext.BranchId) +} + +func TestNewTCCServiceProxy(t *testing.T) { + type args struct { + service interface{} + } + + userProvider := &UserProvider{} + args1 := args{service: userProvider} + args2 := args{service: userProvider} + + twoPhaseAction1, _ := rm.ParseSagaTwoPhaseAction(userProvider) + twoPhaseAction2, _ := rm.ParseSagaTwoPhaseAction(userProvider) + + tests := []struct { + name string + args args + want *SagaServiceProxy + wantErr assert.ErrorAssertionFunc + }{ + { + "test1", args1, &SagaServiceProxy{ + SagaResource: &SagaResource{ + ResourceGroupId: `default:"DEFAULT"`, + AppName: "seata-go-mock-app-name", + SagaAction: twoPhaseAction1, + }, + }, assert.NoError, + }, + { + "test2", args2, &SagaServiceProxy{ + SagaResource: &SagaResource{ + ResourceGroupId: `default:"DEFAULT"`, + AppName: "seata-go-mock-app-name", + SagaAction: twoPhaseAction2, + }, + }, assert.NoError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewSagaServiceProxy(tt.args.service) + if !tt.wantErr(t, err, fmt.Sprintf("NewTCCServiceProxy(%v)", tt.args.service)) { + return + } + assert.Equalf(t, tt.want, got, "NewTCCServiceProxy(%v)", tt.args.service) + }) + } +} + +func TestTCCGetTransactionInfo(t1 *testing.T) { + type fields struct { + referenceName string + registerResourceOnce sync.Once + SagaResource *SagaResource + } + + userProvider := &UserProvider{} + twoPhaseAction1, _ := rm.ParseSagaTwoPhaseAction(userProvider) + + tests := struct { + name string + fields fields + want tm.GtxConfig + }{ + "test1", + fields{ + referenceName: "test1", + registerResourceOnce: sync.Once{}, + SagaResource: &SagaResource{ + ResourceGroupId: "default1", + AppName: "app1", + SagaAction: twoPhaseAction1, + }, + }, + tm.GtxConfig{Name: "TwoPhaseDemoService", Timeout: time.Second * 10, Propagation: 0, LockRetryInternal: 0, LockRetryTimes: 0}, + } + + t1.Run(tests.name, func(t1 *testing.T) { + t := &SagaServiceProxy{ + referenceName: tests.fields.referenceName, + registerResourceOnce: sync.Once{}, + SagaResource: tests.fields.SagaResource, + } + assert.Equalf(t1, tests.want, t.GetTransactionInfo(), "GetTransactionInfo()") + }) +} + +func GetTestTwoPhaseService() rm.SagaActionInterface { + return &testdata2.TestSagaTwoPhaseService{} +} diff --git a/pkg/rm/saga_two_phase.go b/pkg/rm/saga_two_phase.go new file mode 100644 index 000000000..fd401d567 --- /dev/null +++ b/pkg/rm/saga_two_phase.go @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rm + +import ( + "context" + "fmt" + "github.com/seata/seata-go/pkg/tm" + "reflect" +) + +// Define saga mode action, compensationAction branch transaction +// submission, compensation two actions + +const ( + TwoPhaseActionActionTagVal = "action" + TwoPhaseActionCompensationTagVal = "compensation" +) + +type SagaActionInterface interface { + Action(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) + + Compensation(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) +} + +type SagaAction struct { + sagaTwoPhaseService interface{} + action string + normalActionName string + normalAction *reflect.Value + compensationName string + compensation *reflect.Value +} + +func ParseSagaTwoPhaseAction(v interface{}) (*SagaAction, error) { + if m, ok := v.(TwoPhaseInterface); ok { + return parseSagaTwoPhaseActionByTwoPhaseInterface(m), nil + } + return ParseSagaTwoPhaseActionByInterface(v) +} + +func parseSagaTwoPhaseActionByTwoPhaseInterface(v TwoPhaseInterface) *SagaAction { + value := reflect.ValueOf(v) + ma := value.MethodByName("action") + mc := value.MethodByName("compensation") + return &SagaAction{ + sagaTwoPhaseService: v, + action: v.GetActionName(), + normalActionName: "Prepare", + normalAction: &ma, + compensationName: "Commit", + compensation: &mc, + } +} + +func ParseSagaTwoPhaseActionByInterface(v interface{}) (*SagaAction, error) { + valueOfElem := reflect.ValueOf(v).Elem() + typeOf := valueOfElem.Type() + k := typeOf.Kind() + if k != reflect.Struct { + return nil, fmt.Errorf("param should be a struct, instead of a pointer") + } + numField := typeOf.NumField() + + var ( + hasNormalMethodName bool + hasCompensationMethod bool + twoPhaseName string + result = SagaAction{ + sagaTwoPhaseService: v, + } + ) + for i := 0; i < numField; i++ { + t := typeOf.Field(i) + f := valueOfElem.Field(i) + if ms, m, ok := getNormalAction(t, f); ok { + hasNormalMethodName = true + result.normalAction = m + result.normalActionName = ms + } else if ms, m, ok = getCompensationMethod(t, f); ok { + hasCompensationMethod = true + result.compensation = m + result.compensationName = ms + } + } + if !hasNormalMethodName { + return nil, fmt.Errorf("missing normal method") + } + if !hasCompensationMethod { + return nil, fmt.Errorf("missing compensation method") + } + twoPhaseName = getActionName(v) + if twoPhaseName == "" { + return nil, fmt.Errorf("missing two phase name") + } + result.action = twoPhaseName + return &result, nil +} + +func getNormalAction(t reflect.StructField, f reflect.Value) (string, *reflect.Value, bool) { + if t.Tag.Get(TwoPhaseActionTag) != TwoPhaseActionActionTagVal { + return "", nil, false + } + if f.Kind() != reflect.Func || !f.IsValid() { + return "", nil, false + } + // prepare has 2 return error value + if outNum := t.Type.NumOut(); outNum != 2 { + return "", nil, false + } + if returnType := t.Type.Out(0); returnType != typBool { + return "", nil, false + } + if returnType := t.Type.Out(1); returnType != typError { + return "", nil, false + } + // prepared method has at least 1 params, context.Context, and other params + if inNum := t.Type.NumIn(); inNum == 0 { + return "", nil, false + } + if inType := t.Type.In(0); inType != typContext { + return "", nil, false + } + return t.Name, &f, true +} + +func getCompensationMethod(t reflect.StructField, f reflect.Value) (string, *reflect.Value, bool) { + if t.Tag.Get(TwoPhaseActionTag) != TwoPhaseActionCompensationTagVal { + return "", nil, false + } + if f.Kind() != reflect.Func || !f.IsValid() { + return "", nil, false + } + // rollback method has 2 return value + if outNum := t.Type.NumOut(); outNum != 2 { + return "", nil, false + } + if returnType := t.Type.Out(0); returnType != typBool { + return "", nil, false + } + if returnType := t.Type.Out(1); returnType != typError { + return "", nil, false + } + // rollback method has at least 1 params, context.Context, and other params + if inNum := t.Type.NumIn(); inNum != 2 { + return "", nil, false + } + if inType := t.Type.In(0); inType != typContext { + return "", nil, false + } + if inType := t.Type.In(1); inType != TypBusinessContextInterface { + return "", nil, false + } + return t.Name, &f, true +} + +func (sagaAction *SagaAction) GetNormalActionName() string { + return sagaAction.normalActionName +} + +func (sagaAction *SagaAction) GetCompensationName() string { + return sagaAction.compensationName +} + +func (sagaAction *SagaAction) Action(ctx context.Context, param interface{}) (bool, error) { + values := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(param)} + res := sagaAction.normalAction.Call(values) + var ( + r0 = res[0].Interface() + r1 = res[1].Interface() + res0 bool + res1 error + ) + if r0 != nil { + res0 = r0.(bool) + } + if r1 != nil { + res1 = r1.(error) + } + return res0, res1 +} + +//透传上下文参数 +func (sagaAction *SagaAction) Compensation(ctx context.Context, actionContext *tm.BusinessActionContext) (bool, error) { + values := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(actionContext)} + res := sagaAction.normalAction.Call(values) + var ( + r0 = res[0].Interface() + r1 = res[1].Interface() + res0 bool + res1 error + ) + if r0 != nil { + res0 = r0.(bool) + } + if r1 != nil { + res1 = r1.(error) + } + return res0, res1 +} + +func (SagaAction *SagaAction) GetSagaActionName() string { + return SagaAction.action +} + +func (sagaAction *SagaAction) GetActionName(v interface{}) string { + var ( + actionName string + valueOf = reflect.ValueOf(v) + valueOfElem = valueOf.Elem() + typeOf = valueOfElem.Type() + ) + if typeOf.Kind() != reflect.Struct { + return "" + } + numField := valueOfElem.NumField() + for i := 0; i < numField; i++ { + t := typeOf.Field(i) + if actionName = t.Tag.Get(TwoPhaseActionNameTag); actionName != "" { + break + } + } + return actionName +} diff --git a/pkg/rm/tcc/fence/enum/fence_phase.go b/pkg/rm/tcc/fence/enum/fence_phase.go index c056100df..10edb84a5 100644 --- a/pkg/rm/tcc/fence/enum/fence_phase.go +++ b/pkg/rm/tcc/fence/enum/fence_phase.go @@ -32,4 +32,9 @@ const ( // FencePhaseRollback rollback fence phase FencePhaseRollback = FencePhase(3) + + // FencePhaseAction One stage submission + FencePhaseAction = FencePhase(4) + // FencePhaseCompensationAction Compensation status during one-stage transaction rollback + FencePhaseCompensationAction = FencePhase(5) ) diff --git a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go index 5a0854d9f..984bcae58 100644 --- a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go +++ b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go @@ -23,11 +23,10 @@ import ( "database/sql" "errors" "fmt" + "github.com/go-sql-driver/mysql" "sync" "time" - "github.com/go-sql-driver/mysql" - "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/dao" "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model" diff --git a/pkg/rm/tcc/tcc_service.go b/pkg/rm/tcc/tcc_service.go index c65610380..3d7a795e3 100644 --- a/pkg/rm/tcc/tcc_service.go +++ b/pkg/rm/tcc/tcc_service.go @@ -78,7 +78,7 @@ func (t *TCCServiceProxy) Reference() string { func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (interface{}, error) { if tm.IsGlobalTx(ctx) { - err := t.registeBranch(ctx, params) + err := t.registerBranch(ctx, params) if err != nil { return nil, err } @@ -89,8 +89,8 @@ func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (inte return t.TCCResource.Prepare(ctx, params) } -// registeBranch send register branch transaction request -func (t *TCCServiceProxy) registeBranch(ctx context.Context, params interface{}) error { +// registerBranch send register branch transaction request +func (t *TCCServiceProxy) registerBranch(ctx context.Context, params interface{}) error { if !tm.IsGlobalTx(ctx) { errStr := "BranchRegister error, transaction should be opened" log.Errorf(errStr) diff --git a/pkg/rm/tcc/tcc_service_test.go b/pkg/rm/tcc/tcc_service_test.go index 34b0c0dd8..d1ee8e408 100644 --- a/pkg/rm/tcc/tcc_service_test.go +++ b/pkg/rm/tcc/tcc_service_test.go @@ -237,7 +237,7 @@ func TestGetOrCreateBusinessActionContext(t *testing.T) { func TestRegisteBranch(t *testing.T) { ctx := testdata2.GetTestContext() - err := testTccServiceProxy.registeBranch(ctx, nil) + err := testTccServiceProxy.registerBranch(ctx, nil) assert.Nil(t, err) bizContext := tm.GetBusinessActionContext(ctx) assert.Equal(t, testBranchID, bizContext.BranchId) diff --git a/testdata/mock_saga.go b/testdata/mock_saga.go new file mode 100644 index 000000000..9bbf5c77b --- /dev/null +++ b/testdata/mock_saga.go @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testdata + +import ( + "context" + "github.com/seata/seata-go/pkg/tm" +) + +type TestSagaTwoPhaseService struct{} + +func (*TestSagaTwoPhaseService) Action(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) { + return true, nil +} + +func (*TestSagaTwoPhaseService) Compensation(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) { + return true, nil +}