Skip to content

Commit 2a833c8

Browse files
committed
WIP WIP WIP - Projections via listener(s)
1 parent ac58941 commit 2a833c8

File tree

7 files changed

+351
-0
lines changed

7 files changed

+351
-0
lines changed

repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
import lombok.With;
88
import tech.ydb.yoj.ExperimentalApi;
99
import tech.ydb.yoj.repository.db.cache.TransactionLog;
10+
import tech.ydb.yoj.repository.db.listener.EntityEventListener;
11+
import tech.ydb.yoj.repository.db.listener.RepositoryTransactionListener;
12+
import tech.ydb.yoj.repository.db.projection.ProjectionListener;
1013

1114
import java.time.Duration;
15+
import java.util.List;
1216

1317
/**
1418
* Transaction options: isolation level, caching and logging settings.
@@ -45,14 +49,26 @@ public class TxOptions {
4549
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/162")
4650
QueryTracingFilter tracingFilter;
4751

52+
@NonNull
53+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77")
54+
List<EntityEventListener> entityEventListeners;
55+
56+
@NonNull
57+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77")
58+
List<RepositoryTransactionListener> repositoryTransactionListeners;
59+
4860
public static TxOptions create(@NonNull IsolationLevel isolationLevel) {
61+
var projectionListener = new ProjectionListener();
62+
4963
return builder()
5064
.isolationLevel(isolationLevel)
5165
// FIXME First-level cache is enabled by default (for backwards compatibility)
5266
// In the future, first-level cache will be off in read-only transactions unless explicitly enabled
5367
.firstLevelCache(true)
5468
.logLevel(TransactionLog.Level.DEBUG)
5569
.logStatementOnSuccess(true)
70+
.entityEventListeners(isolationLevel.isReadOnly() ? List.of() : List.of(projectionListener))
71+
.repositoryTransactionListeners(isolationLevel.isReadOnly() ? List.of() : List.of(projectionListener))
5672
.build();
5773
}
5874

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package tech.ydb.yoj.repository.db.listener;
2+
3+
import lombok.NonNull;
4+
import tech.ydb.yoj.repository.db.Entity;
5+
import tech.ydb.yoj.repository.db.TableDescriptor;
6+
7+
public interface EntityEventListener {
8+
<E extends Entity<E>> void onLoad(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E entity);
9+
10+
<E extends Entity<E>> void onSave(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E entity);
11+
12+
<E extends Entity<E>> void onDelete(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E entity);
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package tech.ydb.yoj.repository.db.listener;
2+
3+
import lombok.NonNull;
4+
import tech.ydb.yoj.repository.db.RepositoryTransaction;
5+
6+
public interface RepositoryTransactionListener {
7+
default void onFlushWrites(@NonNull RepositoryTransaction transaction) {
8+
}
9+
10+
default void onCommit(@NonNull RepositoryTransaction transaction) {
11+
}
12+
13+
default void onRollback(@NonNull RepositoryTransaction transaction) {
14+
}
15+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package tech.ydb.yoj.repository.db.projection;
2+
3+
import tech.ydb.yoj.repository.db.Entity;
4+
5+
public interface EntityWithProjections<E extends EntityWithProjections<E>> extends Entity<E> {
6+
default ProjectionCollection collectProjections() {
7+
return ProjectionCollection.copyOf(Entity.super.createProjections());
8+
}
9+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package tech.ydb.yoj.repository.db.projection;
2+
3+
import lombok.NonNull;
4+
import tech.ydb.yoj.repository.db.Entity;
5+
import tech.ydb.yoj.repository.db.TableDescriptor;
6+
7+
public record Projection<E extends Entity<E>>(
8+
@NonNull TableDescriptor<E> tableDescriptor,
9+
@NonNull E entity
10+
) {
11+
@NonNull
12+
public Entity.Id<E> entityId() {
13+
return entity.getId();
14+
}
15+
16+
@NonNull
17+
public Key<E> key() {
18+
return new Key<>(tableDescriptor, entity.getId());
19+
}
20+
21+
public record Key<E extends Entity<E>>(
22+
@NonNull TableDescriptor<E> tableDescriptor,
23+
@NonNull Entity.Id<E> entityId
24+
) {
25+
}
26+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package tech.ydb.yoj.repository.db.projection;
2+
3+
import com.google.common.base.Preconditions;
4+
import lombok.NonNull;
5+
import tech.ydb.yoj.repository.db.Entity;
6+
import tech.ydb.yoj.repository.db.EntitySchema;
7+
import tech.ydb.yoj.repository.db.TableDescriptor;
8+
9+
import java.util.AbstractCollection;
10+
import java.util.Collection;
11+
import java.util.Iterator;
12+
import java.util.LinkedHashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Spliterator;
16+
import java.util.function.Consumer;
17+
import java.util.stream.Stream;
18+
19+
import static java.util.stream.Collectors.toList;
20+
21+
public final class ProjectionCollection extends AbstractCollection<Projection<?>> {
22+
private static final ProjectionCollection EMPTY = new ProjectionCollection(List.of());
23+
24+
private final List<Projection<?>> projections;
25+
26+
private ProjectionCollection(@NonNull List<Projection<?>> projections) {
27+
this.projections = List.copyOf(projections);
28+
}
29+
30+
@NonNull
31+
public static ProjectionCollection of() {
32+
return EMPTY;
33+
}
34+
35+
@NonNull
36+
public static ProjectionCollection copyOf(@NonNull Collection<? extends Entity<?>> projections) {
37+
return projections.isEmpty() ? EMPTY : builder().addAll(projections).build();
38+
}
39+
40+
@NonNull
41+
public static ProjectionCollection of(@NonNull Entity<?>... projections) {
42+
return projections.length == 0 ? EMPTY : builder().addAll(projections).build();
43+
}
44+
45+
@NonNull
46+
public static Builder builder() {
47+
return new Builder();
48+
}
49+
50+
@Override
51+
public int size() {
52+
return projections.size();
53+
}
54+
55+
@Override
56+
public boolean isEmpty() {
57+
return projections.isEmpty();
58+
}
59+
60+
@NonNull
61+
@Override
62+
public Stream<Projection<?>> stream() {
63+
return projections.stream();
64+
}
65+
66+
@NonNull
67+
@Override
68+
public Iterator<Projection<?>> iterator() {
69+
return projections.iterator();
70+
}
71+
72+
@Override
73+
public void forEach(Consumer<? super Projection<?>> action) {
74+
projections.forEach(action);
75+
}
76+
77+
@NonNull
78+
@Override
79+
public Spliterator<Projection<?>> spliterator() {
80+
return projections.spliterator();
81+
}
82+
83+
public static final class Builder {
84+
private final Map<Projection.Key<?>, Entity<?>> projections = new LinkedHashMap<>();
85+
86+
private Builder() {
87+
}
88+
89+
@SuppressWarnings("unchecked")
90+
public <E extends Entity<E>> Builder add(@NonNull E projection) {
91+
return add(TableDescriptor.from(EntitySchema.of(projection.getClass())), projection);
92+
}
93+
94+
@SuppressWarnings({"rawtypes", "unchecked"})
95+
public Builder addAll(@NonNull Entity<?>... projections) {
96+
for (Entity<?> e : projections) {
97+
this.add((Entity) e);
98+
}
99+
return this;
100+
}
101+
102+
@SuppressWarnings({"rawtypes", "unchecked"})
103+
public Builder addAll(@NonNull Collection</*@NonNull*/ ? extends Entity<?>> projections) {
104+
for (Entity<?> e : projections) {
105+
this.add((Entity) e);
106+
}
107+
return this;
108+
}
109+
110+
@NonNull
111+
public <E extends Entity<E>> Builder add(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E projection) {
112+
var entityId = projection.getId();
113+
var key = new Projection.Key<>(tableDescriptor, entityId);
114+
Preconditions.checkArgument(!projections.containsKey(key), "Duplicate projection: ID=%s, table descriptor=%s", entityId, tableDescriptor);
115+
116+
projections.put(key, projection);
117+
118+
return this;
119+
}
120+
121+
@NonNull
122+
public ProjectionCollection build() {
123+
return new ProjectionCollection(projections.entrySet().stream().map(Builder::projection).collect(toList()));
124+
}
125+
126+
@SuppressWarnings({"unchecked", "rawtypes"})
127+
private static Projection<?> projection(@NonNull Map.Entry<Projection.Key<?>, ?> e) {
128+
TableDescriptor tableDescriptor = e.getKey().tableDescriptor();
129+
Entity entity = (Entity) e.getValue();
130+
return new Projection<>(tableDescriptor, entity);
131+
}
132+
}
133+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package tech.ydb.yoj.repository.db.projection;
2+
3+
import lombok.NonNull;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import tech.ydb.yoj.repository.db.Entity;
7+
import tech.ydb.yoj.repository.db.RepositoryTransaction;
8+
import tech.ydb.yoj.repository.db.TableDescriptor;
9+
import tech.ydb.yoj.repository.db.listener.EntityEventListener;
10+
import tech.ydb.yoj.repository.db.listener.RepositoryTransactionListener;
11+
12+
import java.util.LinkedHashMap;
13+
import java.util.Map;
14+
import java.util.stream.Stream;
15+
16+
import static java.util.stream.Collectors.toMap;
17+
18+
public final class ProjectionListener implements EntityEventListener, RepositoryTransactionListener {
19+
private static final Logger log = LoggerFactory.getLogger(ProjectionListener.class);
20+
21+
private final Map<Key<?>, Row<?>> rows = new LinkedHashMap<>();
22+
23+
@Override
24+
public <E extends Entity<E>> void onLoad(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E entity) {
25+
row(tableDescriptor, entity).load(entity);
26+
}
27+
28+
@Override
29+
public <E extends Entity<E>> void onSave(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E entity) {
30+
row(tableDescriptor, entity).save(entity);
31+
}
32+
33+
@Override
34+
public <E extends Entity<E>> void onDelete(@NonNull TableDescriptor<E> tableDescriptor, @NonNull E entity) {
35+
row(tableDescriptor, entity).delete();
36+
}
37+
38+
@SuppressWarnings("unchecked")
39+
private <E extends Entity<E>> Row<E> row(TableDescriptor<E> tableDescriptor, E entity) {
40+
return (Row<E>) rows.computeIfAbsent(new Key<>(tableDescriptor, entity.getId()), __ -> new Row<>());
41+
}
42+
43+
@Override
44+
public void onFlushWrites(@NonNull RepositoryTransaction transaction) {
45+
Map<Projection.Key<?>, Projection<?>> oldProjections = rows.values().stream()
46+
.flatMap(Row::projectionsBefore)
47+
.collect(toMap(Projection::key, e -> e, this::mergeOldProjections));
48+
49+
Map<Projection.Key<?>, Projection<?>> newProjections = rows.values().stream()
50+
.flatMap(Row::projectionsAfter)
51+
.collect(toMap(Projection::key, e -> e, this::mergeNewProjections));
52+
53+
for (Row<?> row : rows.values()) {
54+
row.flush();
55+
}
56+
57+
oldProjections.values().stream()
58+
.filter(e -> !newProjections.containsKey(e.key()))
59+
.forEach(e -> deleteEntity(transaction, e));
60+
newProjections.values().stream()
61+
.filter(e -> !e.equals(oldProjections.get(e.key())))
62+
.forEach(e -> saveEntity(transaction, e));
63+
}
64+
65+
private <E extends Entity<E>> void deleteEntity(RepositoryTransaction transaction, Projection<E> projection) {
66+
transaction.table(projection.tableDescriptor()).delete(projection.entityId());
67+
}
68+
69+
private <E extends Entity<E>> void saveEntity(RepositoryTransaction transaction, Projection<E> projection) {
70+
transaction.table(projection.tableDescriptor()).save(projection.entity());
71+
}
72+
73+
private Projection<?> mergeOldProjections(Projection<?> p1, Projection<?> p2) {
74+
if (p1 == p2 || p1.equals(p2)) {
75+
log.error("FIX THIS ASAP! Got two equal projections with the same ID: {}. NO exception is thrown so that "
76+
+ "you can just fix and migrate the entities to fix the projections", p1);
77+
return p1;
78+
}
79+
throw new IllegalStateException("Got two unequal projections with the same ID and table descriptor: p1=" + p1 + "; p2=" + p2);
80+
}
81+
82+
private Projection<?> mergeNewProjections(Projection<?> p1, Projection<?> p2) {
83+
throw new IllegalStateException("Got two projections with the same ID and table descriptor: p1=" + p1 + "; p2=" + p2);
84+
}
85+
86+
private record Key<E extends Entity<E>>(TableDescriptor<E> tableDescriptor, Entity.Id<E> entityId) {
87+
}
88+
89+
private static final class Row<E extends Entity<E>> {
90+
private Entity<E> loaded;
91+
private Entity<E> saved;
92+
private boolean writable;
93+
94+
public void load(Entity<E> entity) {
95+
if (loaded == null) {
96+
loaded = entity;
97+
}
98+
}
99+
100+
public void save(Entity<E> entity) {
101+
saved = entity;
102+
writable = true;
103+
}
104+
105+
public void delete() {
106+
saved = null;
107+
writable = true;
108+
}
109+
110+
public Stream<Projection<?>> projectionsBefore() {
111+
return streamProjections(loaded);
112+
}
113+
114+
public Stream<Projection<?>> projectionsAfter() {
115+
return streamProjections(saved);
116+
}
117+
118+
@NonNull
119+
private Stream<Projection<?>> streamProjections(Entity<E> entity) {
120+
if (writable && entity != null) {
121+
if (entity instanceof EntityWithProjections<?> ewp) {
122+
return ewp.collectProjections().stream();
123+
} else {
124+
return ProjectionCollection.copyOf(entity.createProjections()).stream();
125+
}
126+
} else {
127+
return Stream.empty();
128+
}
129+
}
130+
131+
public void flush() {
132+
if (writable) {
133+
loaded = saved;
134+
}
135+
saved = null;
136+
writable = false;
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)