Skip to content

Commit 1aee20c

Browse files
committed
Core, Hive: Double check commit status in case of commit conflict for NoLock (#12637)
1 parent aa44b04 commit 1aee20c

File tree

4 files changed

+70
-17
lines changed

4 files changed

+70
-17
lines changed

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -222,14 +222,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
222222
throw e;
223223

224224
} catch (Throwable e) {
225-
if (e.getMessage() != null && e.getMessage().contains(
226-
"The table has been modified. The parameter value for key '" +
227-
HiveTableOperations.METADATA_LOCATION_PROP +
228-
"' is")) {
229-
throw new CommitFailedException(
230-
e, "The table %s.%s has been modified concurrently", database, tableName);
231-
}
232-
233225
if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
234226
throw new RuntimeException(
235227
"Failed to acquire locks from metastore because the underlying metastore " +
@@ -238,12 +230,28 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
238230
e);
239231
}
240232

241-
LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
242-
database, tableName, e);
243233
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
244-
commitStatus =
245-
BaseMetastoreOperations.CommitStatus.valueOf(
246-
checkCommitStatus(newMetadataLocation, metadata).name());
234+
if (e.getMessage() != null && e.getMessage().contains(
235+
"The table has been modified. The parameter value for key '" +
236+
HiveTableOperations.METADATA_LOCATION_PROP +
237+
"' is")) {
238+
// It's possible the HMS client incorrectly retries a successful operation, due to network
239+
// issue for example, and triggers this exception. So we need double-check to make sure
240+
// this is really a concurrent modification. Hitting this exception means no pending
241+
// requests, if any, can succeed later, so it's safe to check status in strict mode
242+
commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata);
243+
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
244+
throw new CommitFailedException(
245+
e, "The table %s.%s has been modified concurrently", database, tableName);
246+
}
247+
} else {
248+
LOG.error(
249+
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
250+
database,
251+
tableName,
252+
e);
253+
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
254+
}
247255
switch (commitStatus) {
248256
case SUCCESS:
249257
break;

iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
4949
}
5050
}
5151

52-
metastore.start(hiveConfWithOverrides);
52+
metastore.start(hiveConfWithOverrides, 5, true);
5353
metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides);
5454
if (null != databaseName) {
5555
String dbPath = metastore.getDatabasePath(databaseName);

iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.iceberg.Schema;
4545
import org.apache.iceberg.Table;
4646
import org.apache.iceberg.TableMetadata;
47+
import org.apache.iceberg.TableProperties;
4748
import org.apache.iceberg.catalog.TableIdentifier;
4849
import org.apache.iceberg.exceptions.CommitFailedException;
4950
import org.apache.iceberg.hadoop.ConfigProperties;
@@ -66,6 +67,7 @@
6667
import static org.apache.iceberg.types.Types.NestedField.required;
6768
import static org.assertj.core.api.Assertions.assertThat;
6869
import static org.assertj.core.api.Assertions.assertThatThrownBy;
70+
import static org.mockito.ArgumentMatchers.anyString;
6971
import static org.mockito.Mockito.any;
7072
import static org.mockito.Mockito.atLeastOnce;
7173
import static org.mockito.Mockito.doAnswer;
@@ -206,6 +208,38 @@ public static void cleanup() {
206208
}
207209
}
208210

211+
@Test
212+
public void testMultipleAlterTableForNoLock() throws Exception {
213+
Table table = catalog.loadTable(TABLE_IDENTIFIER);
214+
table.updateProperties().set(TableProperties.HIVE_LOCK_ENABLED, "false").commit();
215+
spyOps.refresh();
216+
TableMetadata metadataV3 = spyOps.current();
217+
AtomicReference<Throwable> alterTableException = new AtomicReference<>(null);
218+
doAnswer(
219+
i -> {
220+
try {
221+
// mock a situation where alter table is unexpectedly invoked more than once
222+
i.callRealMethod();
223+
return i.callRealMethod();
224+
} catch (Throwable e) {
225+
alterTableException.compareAndSet(null, e);
226+
throw e;
227+
}
228+
})
229+
.when(spyClient)
230+
.alter_table_with_environmentContext(anyString(), anyString(), any(), any());
231+
spyOps.commit(metadataV3, metadataV1);
232+
verify(spyClient, times(1))
233+
.alter_table_with_environmentContext(anyString(), anyString(), any(), any());
234+
assertThat(alterTableException)
235+
.as("Expecting to trigger an exception indicating table has been modified")
236+
.hasValueMatching(
237+
t ->
238+
t.getMessage()
239+
.contains("The table has been modified. The parameter value for key '"));
240+
}
241+
242+
209243
@Test
210244
public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException {
211245
doReturn(acquiredLockResponse).when(spyClient).lock(any());

iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,21 @@ public void start(HiveConf conf) {
140140
* @param poolSize The number of threads in the executor pool
141141
*/
142142
public void start(HiveConf conf, int poolSize) {
143+
start(conf, poolSize, false);
144+
}
145+
146+
/**
147+
* Starts a TestHiveMetastore with a provided connection pool size and HiveConf.
148+
*
149+
* @param conf The hive configuration to use
150+
* @param poolSize The number of threads in the executor pool
151+
* @param directSql Used to turn on directSql
152+
*/
153+
public void start(HiveConf conf, int poolSize, boolean directSql) {
143154
try {
144155
TServerSocket socket = new TServerSocket(0);
145156
int port = socket.getServerSocket().getLocalPort();
146-
initConf(conf, port);
157+
initConf(conf, port, directSql);
147158

148159
this.hiveConf = conf;
149160
this.server = newThriftServer(socket, poolSize, hiveConf);
@@ -243,12 +254,12 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con
243254
return new TThreadPoolServer(args);
244255
}
245256

246-
private void initConf(HiveConf conf, int port) {
257+
private void initConf(HiveConf conf, int port, boolean directSql) {
247258
conf.set(HiveConf.ConfVars.METASTORE_URIS.varname, "thrift://localhost:" + port);
248259
conf.set(HiveConf.ConfVars.METASTORE_WAREHOUSE.varname, "file:" + HIVE_WAREHOUSE_DIR.getAbsolutePath());
249260
conf.set(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL.varname,
250261
"file:" + HIVE_EXTERNAL_WAREHOUSE_DIR.getAbsolutePath());
251-
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
262+
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, String.valueOf(directSql));
252263
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
253264
conf.set("iceberg.hive.client-pool-size", "2");
254265
// set to false so that TxnManager#checkLock does not throw exception when using UNSET data type operation

0 commit comments

Comments
 (0)