Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gray mode migrate accerate #12957

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoGrayPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.persistence.model.Page;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
import static com.alibaba.nacos.config.server.utils.PropertyUtil.GRAY_MIGRATE_FLAG;

/**
* migrate beta and tag to gray model. should only invoked from config sync notify.
Expand Down Expand Up @@ -60,8 +65,10 @@ public ConfigGrayModelMigrateService(ConfigInfoBetaPersistService configInfoBeta
* migrate beta&tag to gray .
*/
@PostConstruct
public void migrate() {
doCheckMigrate();
public void migrate() throws Exception {
if (PropertyUtil.isGrayCompatibleModel()) {
doCheckMigrate();
}
}

/**
Expand All @@ -74,8 +81,8 @@ public void migrate() {
public void checkMigrateBeta(String dataId, String group, String tenant) {
ConfigInfoBetaWrapper configInfo4Beta = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
if (configInfo4Beta == null) {
ConfigInfoGrayWrapper configInfoGrayWrapper = configInfoGrayPersistService.findConfigInfo4Gray(dataId, group,
tenant, BetaGrayRule.TYPE_BETA);
ConfigInfoGrayWrapper configInfoGrayWrapper = configInfoGrayPersistService.findConfigInfo4Gray(dataId,
group, tenant, BetaGrayRule.TYPE_BETA);
if (configInfoGrayWrapper == null) {
return;
}
Expand Down Expand Up @@ -133,7 +140,12 @@ public void checkMigrateTag(String dataId, String group, String tenant, String t
}
}

private void doCheckMigrate() {
private void doCheckMigrate() throws Exception {

ThreadPoolExecutor executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(PropertyUtil.getAllDumpPageSize() * 2),
r -> new Thread(r, "gray migrate worker"), new ThreadPoolExecutor.CallerRunsPolicy());
int pageSize = 100;
int rowCount = configInfoBetaPersistService.configInfoBetaCount();
int pageCount = (int) Math.ceil(rowCount * 1.0 / pageSize);
Expand All @@ -144,21 +156,40 @@ private void doCheckMigrate() {
if (page != null) {
for (ConfigInfoBetaWrapper cf : page.getPageItems()) {

ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), BetaGrayRule.TYPE_BETA);
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate beta to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
BetaGrayRule.TYPE_BETA, BetaGrayRule.VERSION, cf.getBetaIps(), BetaGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, BetaGrayRule.TYPE_BETA,
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
}
executorService.execute(() -> {
GRAY_MIGRATE_FLAG.set(true);
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), BetaGrayRule.TYPE_BETA);
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate beta to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
BetaGrayRule.TYPE_BETA, BetaGrayRule.VERSION, cf.getBetaIps(),
BetaGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, BetaGrayRule.TYPE_BETA,
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
GRAY_MIGRATE_FLAG.set(false);
}
});

}
actualRowCount += page.getPageItems().size();
DEFAULT_LOG.info("[gray-migrate-beta] submit gray task {} / {}", actualRowCount, rowCount);

}
}

try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[gray-migrate-beta] wait {} migrate tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}

} catch (Exception e) {
DEFAULT_LOG.error("[gray-migrate-beta] wait dump tasks to be finished error", e);
throw e;
}

rowCount = configInfoTagPersistService.configInfoTagCount();
Expand All @@ -169,23 +200,46 @@ private void doCheckMigrate() {
pageSize);
if (page != null) {
for (ConfigInfoTagWrapper cf : page.getPageItems()) {
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), TagGrayRule.TYPE_TAG + "_" + cf.getTag());
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate tag to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
TagGrayRule.TYPE_TAG, TagGrayRule.VERSION, cf.getTag(), TagGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, TagGrayRule.TYPE_TAG + "_" + cf.getTag(),
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
}

executorService.execute(() -> {
GRAY_MIGRATE_FLAG.set(true);
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(),
TagGrayRule.TYPE_TAG + "_" + cf.getTag());
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate tag to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
TagGrayRule.TYPE_TAG, TagGrayRule.VERSION, cf.getTag(), TagGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf,
TagGrayRule.TYPE_TAG + "_" + cf.getTag(),
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
GRAY_MIGRATE_FLAG.set(false);
}
});

}

actualRowCount += page.getPageItems().size();
DEFAULT_LOG.info("[-tag] {} / {}", actualRowCount, rowCount);
DEFAULT_LOG.info("[gray-migrate-tag] submit gray task {} / {}", actualRowCount, rowCount);
}
}

try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[gray-migrate-tag] wait {} migrate tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}

} catch (Exception e) {
DEFAULT_LOG.error("[gray-migrate-tag] wait migrate tasks to be finished error", e);
throw e;
}
//shut down migrate executor
executorService.shutdown();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_GRAY_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_STATE_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.utils.PropertyUtil.GRAY_MIGRATE_FLAG;

/**
* EmbeddedConfigInfoGrayPersistServiceImpl.
Expand Down Expand Up @@ -147,15 +148,16 @@ public ConfigOperateResult addConfigInfo4Gray(ConfigInfo configInfo, String gray
long hisId = idGeneratorManager.nextId(RESOURCE_CONFIG_HISTORY_ID);

addConfigInfoGrayAtomic(configGrayId, configInfo, grayNameTmp, grayRuleTmp, srcIp, srcUser);

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(hisId, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));

if (!GRAY_MIGRATE_FLAG.get()) {
historyConfigInfoPersistService.insertConfigHistoryAtomic(hisId, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));
}

EmbeddedStorageContextUtils.onModifyConfigGrayInfo(configInfo, grayNameTmp, grayRuleTmp, srcIp, now);
databaseOperate.blockUpdate();

return getGrayOperateResult(configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);
} finally {
EmbeddedStorageContextHolder.cleanAllContext();
Expand Down Expand Up @@ -270,13 +272,12 @@ public ConfigOperateResult updateConfigInfo4Gray(ConfigInfo configInfo, String g
Arrays.asList("data_id", "group_id", "tenant_id", "gray_name"));
final Object[] args = new Object[] {configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp,
grayRuleTmp, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp};

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, now, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));

if (!GRAY_MIGRATE_FLAG.get()) {
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, time, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));
}
EmbeddedStorageContextUtils.onModifyConfigGrayInfo(configInfo, grayNameTmp, grayRuleTmp, srcIp, time);
EmbeddedStorageContextHolder.addSqlContext(sql, args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_GRAY_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_STATE_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.utils.PropertyUtil.GRAY_MIGRATE_FLAG;


/**
Expand Down Expand Up @@ -136,11 +137,12 @@ public ConfigOperateResult addConfigInfo4Gray(ConfigInfo configInfo, String gray
try {
addConfigInfoGrayAtomic(-1, configInfo, grayNameTmp, grayRuleTmp, srcIp, srcUser);

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));

if (!GRAY_MIGRATE_FLAG.get()) {
Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));
}
return getGrayOperateResult(configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);
} catch (Exception e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
Expand Down Expand Up @@ -251,11 +253,12 @@ public ConfigOperateResult updateConfigInfo4Gray(ConfigInfo configInfo, String g
configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, now, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));

if (!GRAY_MIGRATE_FLAG.get()) {
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, now, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));
}
return getGrayOperateResult(configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
*/
private static boolean grayCompatibleModel = true;

public static final ThreadLocal<Boolean> GRAY_MIGRATE_FLAG = ThreadLocal.withInitial(() -> false);

/**
* Whether to enable the limit check function of capacity management, including the upper limit of configuration
* number, configuration content size limit, etc.
Expand Down Expand Up @@ -107,12 +109,12 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
private static int correctUsageDelay = 10 * 60;

private static boolean dumpChangeOn = true;

/**
* The number of days to retain the configuration history, the default is 30 days.
*/
private static int configRententionDays = 30;

/**
* dumpChangeWorkerInterval, default 30 seconds.
*/
Expand Down Expand Up @@ -240,6 +242,7 @@ public static void setDefaultMaxAggrCount(int defaultMaxAggrCount) {

/**
* control whether persist beta and tag to old model.
*
* @return
*/
public static boolean isGrayCompatibleModel() {
Expand All @@ -265,13 +268,13 @@ public static int getCorrectUsageDelay() {
public static void setCorrectUsageDelay(int correctUsageDelay) {
PropertyUtil.correctUsageDelay = correctUsageDelay;
}

public static int getConfigRententionDays() {
return configRententionDays;
}

private void setConfigRententionDays() {
String val = getProperty(PropertiesConstant.CONFIG_RENTENTION_DAYS);
String val = getProperty(PropertiesConstant.CONFIG_RENTENTION_DAYS);
if (null != val) {
int tmp = 0;
try {
Expand All @@ -284,7 +287,7 @@ private void setConfigRententionDays() {
}
}
}

public static boolean isStandaloneMode() {
return EnvUtil.getStandaloneMode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ public class ConfigInfoGrayMapperByMySql extends AbstractMapperByMysql implement

@Override
public MapperResult findAllConfigInfoGrayForDumpAllFetchRows(MapperContext context) {
String sql = " SELECT t.id,data_id,group_id,tenant_id,gray_name,gray_rule,app_name,content,md5,gmt_modified "
+ " FROM ( SELECT id FROM config_info_gray ORDER BY id LIMIT " + context.getStartRow() + ","
+ context.getPageSize() + " ) " + "g, config_info_gray t WHERE g.id = t.id ";
String sql = " SELECT id,data_id,group_id,tenant_id,gray_name,gray_rule,app_name,content,md5,gmt_modified "
+ " FROM config_info_gray ORDER BY id LIMIT " + context.getStartRow() + "," + context.getPageSize();
return new MapperResult(sql, Collections.emptyList());
}

Expand Down
Loading