Skip to content

Commit

Permalink
CURATOR-719: Fix orSetData for parallel create calls (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman authored Dec 12, 2024
1 parent 72fd977 commit ad19795
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1147,23 +1147,26 @@ public String call() throws Exception {

if (createdPath == null) {
try {
if (failBeforeNextCreateForTesting) {
failBeforeNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
} catch (KeeperException.NoNodeException e) {
if (createParentsIfNeeded) {
ZKPaths.mkdirs(
client.getZooKeeper(),
path,
false,
acling.getACLProviderForParents(),
createParentsAsContainers);
createdPath = client.getZooKeeper()
.create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
} else {
throw e;
try {
if (failBeforeNextCreateForTesting) {
failBeforeNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
createdPath =
client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
} catch (KeeperException.NoNodeException e) {
if (createParentsIfNeeded) {
ZKPaths.mkdirs(
client.getZooKeeper(),
path,
false,
acling.getACLProviderForParents(),
createParentsAsContainers);
createdPath = client.getZooKeeper()
.create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
} else {
throw e;
}
}
} catch (KeeperException.NodeExistsException e) {
if (setDataIfExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
Expand All @@ -33,6 +37,7 @@
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilderMain;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
Expand Down Expand Up @@ -365,18 +370,37 @@ public void testCreateWithParentsWithoutAclInNamespaceBackground() throws Except
}

private void check(
CuratorFramework client, CreateBuilderMain builder, String path, byte[] data, boolean expectedSuccess)
CuratorFramework client,
PathAndBytesable<String> builder,
String path,
byte[] data,
boolean expectedSuccess)
throws Exception {
check(
client,
builder,
path,
data,
0,
(expectedSuccess) ? KeeperException.Code.OK : KeeperException.Code.NODEEXISTS);
}

private void check(
CuratorFramework client,
PathAndBytesable<String> builder,
String path,
byte[] data,
int expectedVersion,
KeeperException.Code expectedCode)
throws Exception {
int expectedCode =
(expectedSuccess) ? KeeperException.Code.OK.intValue() : KeeperException.Code.NODEEXISTS.intValue();
try {
builder.forPath(path, data);
assertEquals(expectedCode, KeeperException.Code.OK.intValue());
assertEquals(expectedCode, KeeperException.Code.OK);
Stat stat = new Stat();
byte[] actualData = client.getData().storingStatIn(stat).forPath(path);
assertTrue(IdempotentUtils.matches(0, data, stat.getVersion(), actualData));
assertTrue(IdempotentUtils.matches(expectedVersion, data, stat.getVersion(), actualData));
} catch (KeeperException e) {
assertEquals(expectedCode, e.getCode());
assertEquals(expectedCode.intValue(), e.getCode());
}
}

Expand Down Expand Up @@ -540,6 +564,65 @@ public void testIdempotentCreateConnectionLoss() throws Exception {
}
}

/**
* Tests all cases of create().orSetData()
*/
@Test
public void testOrSetData() throws Exception {
CuratorFramework client = createClient(new DefaultACLProvider());
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
try {
client.start();

Stat stat = new Stat();

String path = "/idpcreate";
String pathWithParents1 = "/idpcreate/1/a/b/c/d";
String pathWithParents2 = "/idpcreate/2/a/b/c/d";
byte[] data1 = new byte[] {1, 2, 3};
byte[] data2 = new byte[] {4, 5, 6};

// first and second create should succeed with the same path and different data
check(client, client.create().orSetData(), path, data1, 0, KeeperException.Code.OK);
check(client, client.create().orSetData(), path, data2, 1, KeeperException.Code.OK);
check(client, client.create(), path, data2, false);

// without creatingParentsIfNeeded, it should fail
check(client, client.create().orSetData(), pathWithParents1, data1, 0, KeeperException.Code.NONODE);

// with creatingParentsIfNeeded, it should succeed and succeed a second time as well
check(
client,
client.create().orSetData().creatingParentsIfNeeded(),
pathWithParents1,
data1,
0,
KeeperException.Code.OK);
check(client, client.create().orSetData(), pathWithParents1, data2, 1, KeeperException.Code.OK);

// Check that calling the same create().orSetData() in parallel is ok
Callable<Exception> setData = () -> {
try {
client.create().orSetData().creatingParentsIfNeeded().forPath(pathWithParents2, data2);
} catch (Exception e) {
return e;
}
return null;
};
Future<Exception> f1 = executor.submit(setData);
Future<Exception> f2 = executor.submit(setData);
assertNull(f1.get(), "Exception thrown during 1st parallel create");
assertNull(f2.get(), "Exception thrown during 2nd parallel create");

byte[] foundData = client.getData().storingStatIn(stat).forPath(pathWithParents2);
assertArrayEquals(data2, foundData, "Data does not match after parallel creates");
assertEquals(1, stat.getVersion(), "Version should be 1 after 2 parallel creates");
} finally {
CloseableUtils.closeQuietly(client);
executor.shutdownNow();
}
}

@Test
public void testCreateProtectedUtils() throws Exception {
try (CuratorFramework client = CuratorFrameworkFactory.builder()
Expand Down

0 comments on commit ad19795

Please sign in to comment.