Skip to content

Commit 021268d

Browse files
committed
#150: Support choosing spliterator implementation for streaming scan queries
...that is, for `YdbRepositoryTransaction.executeScanQuery()` returning `Stream<RESULT>`
1 parent 982cbc5 commit 021268d

File tree

6 files changed

+140
-27
lines changed

6 files changed

+140
-27
lines changed

repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2514,14 +2514,26 @@ public void scanUpdateFails() {
25142514
}
25152515

25162516
@Test
2517-
public void scanNotTruncated() {
2517+
public void scanNotTruncatedOldSpliterator() {
25182518
int maxPageSizeBiggerThatReal = 11_000;
25192519

25202520
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
25212521
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
25222522
));
25232523

2524-
List<Project> result = db.scan().withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
2524+
List<Project> result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
2525+
assertEquals(maxPageSizeBiggerThatReal, result.size());
2526+
}
2527+
2528+
@Test
2529+
public void scanNotTruncatedNewSpliterator() {
2530+
int maxPageSizeBiggerThatReal = 11_000;
2531+
2532+
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
2533+
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
2534+
));
2535+
2536+
List<Project> result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
25252537
assertEquals(maxPageSizeBiggerThatReal, result.size());
25262538
}
25272539

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
298298
result = doCall(statement.toDebugString(params), () -> {
299299
if (options.isScan()) {
300300
return options.getScanOptions().isUseNewSpliterator()
301-
? doExecuteScanQueryList(statement, params)
302-
: doExecuteScanQueryLegacy(statement, params);
301+
? listScanQueryNew(statement, params)
302+
: listScanQueryLegacy(statement, params);
303303
} else {
304304
return doExecuteDataQuery(statement, params);
305305
}
@@ -371,7 +371,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
371371
return new ResultSetConverter(resultSet).stream(statement::readResult).collect(toList());
372372
}
373373

374-
private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
374+
private <PARAMS, RESULT> List<RESULT> listScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
375375
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
376376
.withRequestTimeout(options.getScanOptions().getTimeout())
377377
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
@@ -396,9 +396,9 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS,
396396
return result;
397397
}
398398

399-
private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, RESULT> statement, PARAMS params) {
399+
private <PARAMS, RESULT> List<RESULT> listScanQueryNew(Statement<PARAMS, RESULT> statement, PARAMS params) {
400400
List<RESULT> result = new ArrayList<>();
401-
try (Stream<RESULT> stream = executeScanQuery(statement, params)) {
401+
try (Stream<RESULT> stream = streamScanQueryNew(statement, params)) {
402402
stream.forEach(r -> {
403403
if (result.size() >= options.getScanOptions().getMaxSize()) {
404404
throw new ResultTruncatedException(
@@ -418,6 +418,12 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
418418
throw new IllegalStateException("Scan query can be used only from scan tx");
419419
}
420420

421+
return options.getScanOptions().isUseNewSpliterator()
422+
? streamScanQueryNew(statement, params)
423+
: streamScanQueryLegacy(statement, params);
424+
}
425+
426+
private <PARAMS, RESULT> Stream<RESULT> streamScanQueryNew(Statement<PARAMS, RESULT> statement, PARAMS params) {
421427
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
422428
.withRequestTimeout(options.getScanOptions().getTimeout())
423429
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
@@ -437,6 +443,36 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
437443
return spliterator.createStream();
438444
}
439445

446+
private <PARAMS, RESULT> Stream<RESULT> streamScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
447+
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
448+
.withRequestTimeout(options.getScanOptions().getTimeout())
449+
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
450+
.build();
451+
452+
String yql = getYql(statement);
453+
Params sdkParams = getSdkParams(statement, params);
454+
455+
try {
456+
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(false, action ->
457+
doCall(statement.toDebugString(params), () -> {
458+
Status status = YdbOperations.safeJoin(
459+
session.executeScanQuery(
460+
yql, sdkParams, settings,
461+
rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(action)
462+
),
463+
options.getScanOptions().getTimeout().plusMinutes(5)
464+
);
465+
validate("SCAN_QUERY: " + yql, status.getCode(), status.toString());
466+
})
467+
);
468+
return spliterator.makeStream();
469+
} catch (RepositoryException e) {
470+
throw e;
471+
} catch (Exception e) {
472+
throw new UnexpectedException("Could not perform scan query", e);
473+
}
474+
}
475+
440476
private QueryStatsCollectionMode getSdkStatsMode() {
441477
var queryStats = options.getQueryStats();
442478
return queryStats == null

repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import lombok.AllArgsConstructor;
1414
import lombok.Setter;
1515
import lombok.SneakyThrows;
16-
import lombok.Value;
1716
import lombok.experimental.Delegate;
1817
import org.assertj.core.api.Assertions;
1918
import org.junit.Assert;
@@ -87,6 +86,7 @@
8786
import tech.ydb.yoj.repository.ydb.sample.model.HintInt64Range;
8887
import tech.ydb.yoj.repository.ydb.sample.model.HintTablePreset;
8988
import tech.ydb.yoj.repository.ydb.sample.model.HintUniform;
89+
import tech.ydb.yoj.repository.ydb.statement.FindAllYqlStatement;
9090
import tech.ydb.yoj.repository.ydb.statement.FindStatement;
9191
import tech.ydb.yoj.repository.ydb.statement.YqlStatement;
9292
import tech.ydb.yoj.repository.ydb.table.YdbTable;
@@ -212,6 +212,18 @@ public void throwConversionExceptionOnSerializationProblem() {
212212

213213
@Test
214214
public void readYqlListAndMap() {
215+
record GroupByResult(
216+
String id,
217+
List<String> items,
218+
Map<String, String> map,
219+
220+
@Column(flatten = false)
221+
GroupByResult.Struct struct
222+
) {
223+
record Struct(String name) {
224+
}
225+
}
226+
215227
WithUnflattenableField entity = new WithUnflattenableField(
216228
new WithUnflattenableField.Id("id_yql_list"),
217229
new WithUnflattenableField.Unflattenable("Hello, world!", 100_500)
@@ -220,7 +232,7 @@ public void readYqlListAndMap() {
220232
db.tx(() -> {
221233
EntitySchema<WithUnflattenableField> schema = EntitySchema.of(WithUnflattenableField.class);
222234
var tableDescriptor = TableDescriptor.from(schema);
223-
List<GroupByResult> result = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction())
235+
List<GroupByResult> result = ydbRepositoryTransaction()
224236
.execute(new YqlStatement<>(tableDescriptor, schema, ObjectSchema.of(GroupByResult.class)) {
225237
@Override
226238
public String getQuery(String tablespace) {
@@ -247,20 +259,6 @@ public QueryType getQueryType() {
247259
});
248260
}
249261

250-
@Value
251-
static class GroupByResult {
252-
String id;
253-
List<String> items;
254-
Map<String, String> map;
255-
@Column(flatten = false)
256-
Struct struct;
257-
258-
@Value
259-
static class Struct {
260-
String name;
261-
}
262-
}
263-
264262
@Test
265263
public void readViewFromCache() {
266264
TypeFreak tf1 = newTypeFreak(0, "AAA1", "bbb");
@@ -935,15 +933,15 @@ public void creatingRepositoryDoesNotConnect() {
935933
public void ydbTransactionCompatibility() {
936934
db.tx(() -> {
937935
// No db tx or session yet!
938-
var sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
936+
var sdkTx = ydbRepositoryTransaction().toSdkTransaction();
939937
assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId);
940938
assertThat(sdkTx.getId()).isNull();
941939
assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW);
942940
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(sdkTx::getStatusFuture);
943941

944942
// Perform any read - session and tx ID appear
945943
db.projects().countAll();
946-
sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
944+
sdkTx = ydbRepositoryTransaction().toSdkTransaction();
947945
assertThat(sdkTx.getSessionId()).isNotNull();
948946
assertThat(sdkTx.getId()).isNotNull();
949947
assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW);
@@ -961,15 +959,15 @@ public void ydbTransactionCompatibility() {
961959

962960
db.readOnly().withStatementIsolationLevel(isolationLevel).run(() -> {
963961
// No db tx or session yet!
964-
var sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
962+
var sdkTx = ydbRepositoryTransaction().toSdkTransaction();
965963
assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId);
966964
assertThat(sdkTx.getId()).isNull();
967965
assertThat(sdkTx.getTxMode()).isEqualTo(txMode);
968966
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(sdkTx::getStatusFuture);
969967

970968
// Perform any read - session and tx ID appear
971969
db.projects().countAll();
972-
sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
970+
sdkTx = ydbRepositoryTransaction().toSdkTransaction();
973971
assertThat(sdkTx.getSessionId()).isNotNull();
974972
// Read transactions might have no ID or might have an ID, depending on your YDB version (that's what YDB returns, folks!)
975973
assertThat(sdkTx.getTxMode()).isEqualTo(txMode);
@@ -1260,6 +1258,53 @@ private void write(TopicClient topicClient, String topicPath, String producer,
12601258
}
12611259
}
12621260

1261+
public void streamingScanNotTruncatedOldSpliterator() {
1262+
int maxPageSizeBiggerThatReal = 11_000;
1263+
1264+
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
1265+
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
1266+
));
1267+
1268+
List<Project.Id> result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> {
1269+
var schema = EntitySchema.of(Project.class);
1270+
var desc = TableDescriptor.from(schema);
1271+
var statement = new FindAllYqlStatement<>(desc, schema, schema);
1272+
1273+
var projectIds = new ArrayList<Project.Id>();
1274+
try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) {
1275+
stream.forEach(p -> projectIds.add(p.getId()));
1276+
}
1277+
return projectIds;
1278+
});
1279+
assertEquals(maxPageSizeBiggerThatReal, result.size());
1280+
}
1281+
1282+
@Test
1283+
public void streamingScanNotTruncatedNewSpliterator() {
1284+
int maxPageSizeBiggerThatReal = 11_000;
1285+
1286+
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
1287+
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
1288+
));
1289+
1290+
List<Project.Id> result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> {
1291+
var schema = EntitySchema.of(Project.class);
1292+
var desc = TableDescriptor.from(schema);
1293+
var statement = new FindAllYqlStatement<>(desc, schema, schema);
1294+
1295+
var projectIds = new ArrayList<Project.Id>();
1296+
try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) {
1297+
stream.forEach(p -> projectIds.add(p.getId()));
1298+
}
1299+
return projectIds;
1300+
});
1301+
assertEquals(maxPageSizeBiggerThatReal, result.size());
1302+
}
1303+
1304+
private static YdbRepositoryTransaction<?> ydbRepositoryTransaction() {
1305+
return (YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction();
1306+
}
1307+
12631308
@AllArgsConstructor
12641309
private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase {
12651310
@Delegate

repository/src/main/java/tech/ydb/yoj/repository/db/DelegatingTxManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ public ScanBuilder withTimeout(Duration timeout) {
210210
return new ScanBuilderImpl(delegate.withTimeout(timeout));
211211
}
212212

213+
@Override
214+
public ScanBuilder useNewSpliterator(boolean useNewSpliterator) {
215+
return new ScanBuilderImpl(delegate.useNewSpliterator(useNewSpliterator));
216+
}
217+
213218
@Override
214219
public <T> T run(Supplier<T> supplier) throws RetryableException {
215220
return doRunTx(() -> this.delegate.run(wrapTxBody(supplier)));

repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,11 @@ public ScanBuilder withTimeout(Duration timeout) {
372372
return withOptions(options.withTimeout(timeout));
373373
}
374374

375+
@Override
376+
public ScanBuilder useNewSpliterator(boolean useNewSpliterator) {
377+
return withOptions(options.withUseNewSpliterator(useNewSpliterator));
378+
}
379+
375380
@Override
376381
public <T> T run(Supplier<T> supplier) throws RetryableException {
377382
TxOptions txOptions = StdTxManager.this.options

repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tech.ydb.yoj.repository.db;
22

33
import lombok.NonNull;
4+
import tech.ydb.yoj.ExperimentalApi;
45
import tech.ydb.yoj.repository.db.cache.TransactionLog;
56
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
67
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
@@ -199,6 +200,15 @@ interface ScanBuilder {
199200

200201
ScanBuilder withTimeout(Duration timeout);
201202

203+
/**
204+
* Specifies whether the new {@code Spliterator} implementation is used for streaming scan query results.
205+
* The new implementation better conforms to the {@code Spliterator} contract and consumes less memory.
206+
* <p>Note that using the new implementation currently has a negative performance impact, for more information refer to
207+
* <a href="https://github.com/ydb-platform/yoj-project/issues/42">GitHub Issue #42</a>.
208+
*/
209+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
210+
ScanBuilder useNewSpliterator(boolean useNewSpliterator);
211+
202212
<T> T run(Supplier<T> supplier);
203213

204214
default void run(Runnable runnable) {

0 commit comments

Comments
 (0)