diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeStore.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeStore.java index 3f4d082267..dc194ff1d3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeStore.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeStore.java @@ -68,7 +68,6 @@ public DefaultTableRuntimeStore( Preconditions.checkNotNull(tableIdentifier, "ServerTableIdentifier must not be null."); Preconditions.checkNotNull(meta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(requiredStates, "requiredStates must not be null."); - Preconditions.checkNotNull(restoredStates, "restoredStates must not be null."); this.tableIdentifier = tableIdentifier; this.meta = meta; restoreStates(requiredStates, restoredStates); @@ -136,37 +135,41 @@ public TableRuntimeOperation begin() { protected void restoreStates( List> requiredStates, List restoredStates) { - Map stateMap = - restoredStates.stream() - .collect(Collectors.toMap(TableRuntimeState::getStateKey, Function.identity())); - - requiredStates.forEach( - key -> { - if (stateMap.containsKey(key.getKey())) { - states.put(key.getKey(), stateMap.get(key.getKey())); - stateMap.remove(key.getKey()); - } else { - doAs( - TableRuntimeMapper.class, - mapper -> - mapper.saveState( - tableIdentifier.getId(), key.getKey(), key.serializeDefault())); - TableRuntimeState state = - getAs( - TableRuntimeMapper.class, - m -> m.getState(tableIdentifier.getId(), key.getKey())); - Preconditions.checkNotNull(state, "State %s initialize failed", key.getKey()); - states.put(key.getKey(), state); - } - }); - - if (!stateMap.isEmpty()) { - LOG.warn("Found {} useless runtime states for {}", stateMap.size(), tableIdentifier); - stateMap.forEach( - (k, s) -> { - LOG.warn("Remove useless runtime state {} for {}", k, tableIdentifier); - doAs(TableRuntimeMapper.class, m -> m.removeState(s.getStateId())); + if (restoredStates == null) { + doAs(TableRuntimeMapper.class, m -> m.saveState(meta.getTableId(), null, null)); + } else { + Map stateMap = + restoredStates.stream() + .collect(Collectors.toMap(TableRuntimeState::getStateKey, Function.identity())); + + requiredStates.forEach( + key -> { + if (stateMap.containsKey(key.getKey())) { + states.put(key.getKey(), stateMap.get(key.getKey())); + stateMap.remove(key.getKey()); + } else { + doAs( + TableRuntimeMapper.class, + mapper -> + mapper.saveState( + tableIdentifier.getId(), key.getKey(), key.serializeDefault())); + TableRuntimeState state = + getAs( + TableRuntimeMapper.class, + m -> m.getState(tableIdentifier.getId(), key.getKey())); + Preconditions.checkNotNull(state, "State %s initialize failed", key.getKey()); + states.put(key.getKey(), state); + } }); + + if (!stateMap.isEmpty()) { + LOG.warn("Found {} useless runtime states for {}", stateMap.size(), tableIdentifier); + stateMap.forEach( + (k, s) -> { + LOG.warn("Remove useless runtime state {} for {}", k, tableIdentifier); + doAs(TableRuntimeMapper.class, m -> m.removeState(s.getStateId())); + }); + } } }