Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,18 +86,49 @@ protected void initHandler(List<TableRuntime> tableRuntimeList) {
.forEach(
tableRuntime -> {
if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
executor.schedule(
() -> executeTask(tableRuntime), getStartDelay(), TimeUnit.MILLISECONDS);
scheduleTableExecution(
tableRuntime, calculateExecutionDelay(tableRuntime, getCleanupOperation()));
}
});

logger.info("Table executor {} initialized", getClass().getSimpleName());
}

private long calculateExecutionDelay(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
// If the table needs to be executed immediately, schedule it to run after a short delay.
if (shouldExecuteTask(tableRuntime, cleanupOperation)) {
return getStartDelay();
}

// If the table does not need to be executed immediately, schedule it for the next execution
// time.
// Adding getStartDelay() helps distribute the execution time of multiple tables,
// reducing the probability of simultaneous execution and system load spikes.
return getNextExecutingTime(tableRuntime) + getStartDelay();
}

/**
* Schedule a table for execution with the specified delay.
*
* @param tableRuntime The table runtime to schedule
* @param delay The delay in milliseconds before execution
*/
private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
executor.schedule(() -> executeTask(tableRuntime), delay, TimeUnit.MILLISECONDS);
logger.debug(
"Scheduled execution for table {} with delay {} ms",
tableRuntime.getTableIdentifier(),
delay);
}

private void executeTask(TableRuntime tableRuntime) {
try {
if (isExecutable(tableRuntime)) {
execute(tableRuntime);
// Different tables take different amounts of time to execute the end of execute(),
// so you need to perform the update operation separately for each table.
persistUpdatingCleanupTime(tableRuntime);
}
} finally {
scheduledTables.remove(tableRuntime.getTableIdentifier());
Expand All @@ -117,6 +150,99 @@ protected final void scheduleIfNecessary(TableRuntime tableRuntime, long millise

protected abstract void execute(TableRuntime tableRuntime);

protected boolean shouldExecute(Long lastCleanupEndTime) {
return true;
}

private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
CleanupOperation cleanupOperation = getCleanupOperation();
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
return;
}

try {
long currentTime = System.currentTimeMillis();
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);

logger.debug(
"Update lastCleanTime for table {} with cleanup operation {}",
tableRuntime.getTableIdentifier().getTableName(),
cleanupOperation);
} catch (Exception e) {
logger.error(
"Failed to update lastCleanTime for table {}",
tableRuntime.getTableIdentifier().getTableName(),
e);
}
}

/**
* Get cleanup operation. Default is NONE, subclasses should override this method to provide
* specific operation.
*
* @return cleanup operation
*/
protected CleanupOperation getCleanupOperation() {
return CleanupOperation.NONE;
}

protected boolean shouldExecuteTask(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
return true;
}

long lastCleanupEndTime =
((DefaultTableRuntime) tableRuntime).getLastCleanTime(cleanupOperation);

// If it's zero, execute the task
if (lastCleanupEndTime == 0L) {
logger.debug(
"LastCleanupTime for table {} with operation {} is not exist, executing task",
tableRuntime.getTableIdentifier().getTableName(),
cleanupOperation);
return true;
}

// After ams restarts, certain cleanup operations can only be re-executed
// if sufficient time has elapsed since the last cleanup.
boolean result = shouldExecute(lastCleanupEndTime);
logger.debug(
result
? "Should execute task for table {} with {}"
: "Not enough time has passed since last cleanup for table {} with {}, delaying execution",
tableRuntime.getTableIdentifier().getTableName(),
cleanupOperation);

return result;
}

/**
* Check if the operation should be skipped based on common conditions.
*
* @param tableRuntime the table runtime to check
* @param cleanupOperation the cleanup operation to perform
* @return true if the operation should be skipped, false otherwise
*/
private boolean shouldSkipOperation(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
if (cleanupOperation == CleanupOperation.NONE) {
logger.debug(
"No cleanup operation specified, skipping cleanup time check for table {}",
tableRuntime.getTableIdentifier().getTableName());
return true;
}

if (!(tableRuntime instanceof DefaultTableRuntime)) {
logger.debug(
"Table runtime is not DefaultTableRuntime, skipping cleanup time check for table {}",
tableRuntime.getTableIdentifier().getTableName());
return true;
}

return false;
}

protected String getThreadName() {
return String.join("-", StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName()))
.toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,6 +49,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}

@Override
protected boolean shouldExecute(Long lastCleanupEndTime) {
return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL;
}

@Override
protected CleanupOperation getCleanupOperation() {
return CleanupOperation.DANGLING_DELETE_FILES_CLEANING;
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime instanceof DefaultTableRuntime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,6 +48,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return interval.toMillis();
}

@Override
protected boolean shouldExecute(Long lastCleanupEndTime) {
return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis();
}

@Override
protected CleanupOperation getCleanupOperation() {
return CleanupOperation.DATA_EXPIRING;
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,6 +46,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {
return interval.toMillis();
}

@Override
protected boolean shouldExecute(Long lastCleanupEndTime) {
return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis();
}

@Override
protected CleanupOperation getCleanupOperation() {
return CleanupOperation.ORPHAN_FILES_CLEANING;
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().isCleanOrphanEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,6 +56,16 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
scheduleIfNecessary(tableRuntime, getStartDelay());
}

@Override
protected boolean shouldExecute(Long lastCleanupEndTime) {
return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL;
}

@Override
protected CleanupOperation getCleanupOperation() {
return CleanupOperation.SNAPSHOTS_EXPIRING;
}

@Override
protected long getExecutorDelay() {
return ThreadLocalRandom.current().nextLong(INTERVAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -81,11 +83,17 @@ public class DefaultTableRuntime extends AbstractTableRuntime
.jsonType(AbstractOptimizingEvaluator.PendingInput.class)
.defaultValue(new AbstractOptimizingEvaluator.PendingInput());

private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
StateKey.stateKey("cleanup_state")
.jsonType(TableRuntimeCleanupState.class)
.defaultValue(new TableRuntimeCleanupState());

private static final StateKey<Long> PROCESS_ID_KEY =
StateKey.stateKey("process_id").longType().defaultValue(0L);

public static final List<StateKey<?>> REQUIRED_STATES =
Lists.newArrayList(OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY);
Lists.newArrayList(
OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, CLEANUP_STATE_KEY);

private final Map<Action, TableProcessContainer> processContainerMap = Maps.newConcurrentMap();
private final TableOptimizingMetrics optimizingMetrics;
Expand Down Expand Up @@ -353,6 +361,47 @@ public void beginProcess(OptimizingProcess optimizingProcess) {
.commit();
}

public long getLastCleanTime(CleanupOperation operation) {
TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY);
switch (operation) {
case ORPHAN_FILES_CLEANING:
return state.getLastOrphanFilesCleanTime();
case DANGLING_DELETE_FILES_CLEANING:
return state.getLastDanglingDeleteFilesCleanTime();
case DATA_EXPIRING:
return state.getLastDataExpiringTime();
case SNAPSHOTS_EXPIRING:
return state.getLastSnapshotsExpiringTime();
default:
return 0L;
}
}

public void updateLastCleanTime(CleanupOperation operation, long time) {
store()
.begin()
.updateState(
CLEANUP_STATE_KEY,
state -> {
switch (operation) {
case ORPHAN_FILES_CLEANING:
state.setLastOrphanFilesCleanTime(time);
break;
case DANGLING_DELETE_FILES_CLEANING:
state.setLastDanglingDeleteFilesCleanTime(time);
break;
case DATA_EXPIRING:
state.setLastDataExpiringTime(time);
break;
case SNAPSHOTS_EXPIRING:
state.setLastSnapshotsExpiringTime(time);
break;
}
return state;
})
.commit();
}

public void completeProcess(boolean success) {
OptimizingStatus originalStatus = getOptimizingStatus();
OptimizingType processType = optimizingProcess.getOptimizingType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table.cleanup;

/** Table cleanup operation enum. Defines different operation types for table cleanup tasks. */
public enum CleanupOperation {
DANGLING_DELETE_FILES_CLEANING,
ORPHAN_FILES_CLEANING,
DATA_EXPIRING,
SNAPSHOTS_EXPIRING,
// NONE indicates operation types where no cleanup process records are
// saved in the table_runtime_state table.
NONE;
}
Loading