Skip to content

Commit

Permalink
optimize: distinguish database behavior according to the branch type (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
booogu authored Apr 27, 2020
1 parent b8d28f1 commit e5c68a7
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 96 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ before_script:

script:
- if [ "$TRAVIS_BRANCH" == "develop" ] && [ "$TRAVIS_PULL_REQUEST" == false ]; then
travis_wait 30 ./mvnw clean install -DskipTests=false -Dcheckstyle.skip=false -Dlicense.skip=false -q -P image -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
travis_wait 30 ./mvnw clean install -DskipTests=false -Dcheckstyle.skip=false -Dlicense.skip=false -P image -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
else
travis_wait 30 ./mvnw clean install -DskipTests=false -Dcheckstyle.skip=false -Dlicense.skip=false -q -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
travis_wait 30 ./mvnw clean install -DskipTests=false -Dcheckstyle.skip=false -Dlicense.skip=false -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
fi
after_success:
- bash <(curl -s https://codecov.io/bash)
105 changes: 45 additions & 60 deletions core/src/main/java/io/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/
package io.seata.core.context;

import java.util.Map;

import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.util.StringUtils;
import io.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* The type Root context.
Expand All @@ -41,7 +42,10 @@ private RootContext() {
*/
public static final String KEY_XID = "TX_XID";

public static final String KEY_XID_INTERCEPTOR_TYPE = "tx-xid-interceptor-type";
/**
* The constant KEY_BRANCH_TYPE
*/
public static final String KEY_BRANCH_TYPE = "TX_BRANCH_TYPE";

public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";

Expand All @@ -57,24 +61,9 @@ public static String getXID() {
if (StringUtils.isNotBlank(xid)) {
return xid;
}

String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
if (StringUtils.isNotBlank(xidType) && xidType.contains("_")) {
return xidType.split("_")[0];
}

return null;
}

/**
* Gets xid.
*
* @return the xid
*/
public static String getXIDInterceptorType() {
return CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
}

/**
* Bind.
*
Expand All @@ -87,36 +76,6 @@ public static void bind(String xid) {
CONTEXT_HOLDER.put(KEY_XID, xid);
}

/**
* Bind interceptor type
*
* @param xidType
*/
public static void bindInterceptorType(String xidType) {
if (StringUtils.isNotBlank(xidType)) {

String[] xidTypes = xidType.split("_");

if (xidTypes.length == 2) {
bindInterceptorType(xidTypes[0], BranchType.valueOf(xidTypes[1]));
}
}
}

/**
* Bind interceptor type
*
* @param xid
* @param branchType
*/
public static void bindInterceptorType(String xid, BranchType branchType) {
String xidType = String.format("%s_%s", xid, branchType.name());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind interceptor type xid={} branchType={}", xid, branchType);
}
CONTEXT_HOLDER.put(KEY_XID_INTERCEPTOR_TYPE, xidType);
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand All @@ -143,19 +102,6 @@ public static String unbind() {
return xid;
}

/**
* Unbind temporary string
*
* @return the string
*/
public static String unbindInterceptorType() {
String xidType = CONTEXT_HOLDER.remove(KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind inteceptor type {}", xidType);
}
return xidType;
}

public static void unbindGlobalLockFlag() {
String lockFlag = CONTEXT_HOLDER.remove(KEY_GLOBAL_LOCK_FLAG);
if (LOGGER.isDebugEnabled() && lockFlag != null) {
Expand All @@ -172,6 +118,45 @@ public static boolean inGlobalTransaction() {
return CONTEXT_HOLDER.get(KEY_XID) != null;
}

/**
* get the branch type
*
* @return the branch type String
*/
public static String getBranchType() {
String branchType = CONTEXT_HOLDER.get(KEY_BRANCH_TYPE);
if (StringUtils.isNotBlank(branchType)) {
return branchType;
}
return null;
}

/**
* bind branch type
*
* @param branchType the branch type
*/
public static void bindBranchType(BranchType branchType) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind branch type {}", branchType);
}

CONTEXT_HOLDER.put(KEY_BRANCH_TYPE, branchType.name());
}

/**
* unbind branch type
*
* @return the previous branch type string
*/
public static String unbindBranchType() {
String unbindBranchType = CONTEXT_HOLDER.remove(KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind branch type {}", unbindBranchType);
}
return unbindBranchType;
}

/**
* requires global lock check
*
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/io/seata/core/context/RootContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.seata.common.exception.ShouldNeverHappenException;

import io.seata.core.model.BranchType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -31,6 +32,8 @@ public class RootContextTest {

private final String DEFAULT_XID = "default_xid";

private final BranchType DEFAULT_BRANCH_TYPE = BranchType.AT;

/**
* Test bind and unbind.
*/
Expand Down Expand Up @@ -94,4 +97,15 @@ public void testAssertNotInGlobalTransaction() {
assertThat(RootContext.getXID()).isNull();
}

@Test
public void testBindBranchType_And_UnbindBranchType(){
assertThat(RootContext.getBranchType()).isNull();
assertThat(RootContext.unbindBranchType()).isNull();
RootContext.bindBranchType(DEFAULT_BRANCH_TYPE);
assertThat(RootContext.unbindBranchType()).isEqualTo(DEFAULT_BRANCH_TYPE.name());
RootContext.unbindBranchType();
assertThat(RootContext.getBranchType()).isNull();
assertThat(RootContext.unbindBranchType()).isNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import io.seata.common.util.StringUtils;
import io.seata.core.context.RootContext;
import io.seata.core.constants.DubboConstants;
import io.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,45 +45,51 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
return invoker.invoke(invocation);
}
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
String branchType = RootContext.getBranchType();

String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType != null ? rpcBranchType : "AT");
}
}
}
try {
return invoker.invoke(invocation);

} finally {
if (bind) {
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
String previousBranchType = RootContext.getBranchType();
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.unbindBranchType();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType != null ? previousBranchType : "AT");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid,
unbindXid, rpcXidInterceptorType, unbindInterceptorType);
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType != null ? rpcBranchType : "AT", previousBranchType != null ? previousBranchType : "AT");
if (unbindXid != null) {
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid,
unbindInterceptorType);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType != null ? previousBranchType : "AT");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.seata.integration.dubbo;

import io.seata.common.util.StringUtils;
import io.seata.core.constants.DubboConstants;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -25,7 +28,6 @@
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.seata.core.constants.DubboConstants;

/**
* The type Transaction propagation filter.
Expand All @@ -40,42 +42,51 @@ public class ApacheDubboTransactionPropagationFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
String branchType = RootContext.getBranchType();

String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType != null ? rpcBranchType : "AT");
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
String previousBranchType = RootContext.getBranchType();
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.unbindBranchType();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType != null ? previousBranchType : "AT");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType != null ? rpcBranchType : "AT", previousBranchType != null ? previousBranchType : "AT");
if (unbindXid != null) {
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
if (StringUtils.equals(BranchType.TCC.name(), previousBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType != null ? previousBranchType : "AT");
}
}
}
}
Expand Down
Loading

0 comments on commit e5c68a7

Please sign in to comment.