Skip to content

Commit cdfcad3

Browse files
committed
Re-add trigger that checks if max-sequence-number on journal_persistence_ids is bigger, before updating the table.
1 parent 451fdca commit cdfcad3

File tree

8 files changed

+133
-10
lines changed

8 files changed

+133
-10
lines changed

core/src/test/resources/schema/postgres/nested-partitions-schema.sql

+23-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ CREATE TABLE IF NOT EXISTS public.snapshot
6565
PRIMARY KEY (persistence_id, sequence_number)
6666
);
6767

68+
DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON public.journal_persistence_ids;
69+
DROP FUNCTION IF EXISTS public.check_persistence_id_max_sequence_number();
6870
DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON public.journal;
6971
DROP FUNCTION IF EXISTS public.update_journal_persistence_ids();
7072
DROP TABLE IF EXISTS public.journal_persistence_ids;
@@ -85,8 +87,8 @@ BEGIN
8587
VALUES (NEW.persistence_id, NEW.sequence_number, NEW.ordering, NEW.ordering)
8688
ON CONFLICT (persistence_id) DO UPDATE
8789
SET
88-
max_sequence_number = GREATEST(public.journal_persistence_ids.max_sequence_number, NEW.sequence_number),
89-
max_ordering = GREATEST(public.journal_persistence_ids.max_ordering, NEW.ordering),
90+
max_sequence_number = NEW.sequence_number,
91+
max_ordering = NEW.ordering,
9092
min_ordering = LEAST(public.journal_persistence_ids.min_ordering, NEW.ordering);
9193

9294
RETURN NEW;
@@ -98,3 +100,22 @@ CREATE TRIGGER trig_update_journal_persistence_ids
98100
AFTER INSERT ON public.journal
99101
FOR EACH ROW
100102
EXECUTE PROCEDURE public.update_journal_persistence_ids();
103+
104+
CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS
105+
$$
106+
DECLARE
107+
BEGIN
108+
IF NEW.max_sequence_number <= OLD.max_sequence_number THEN
109+
RAISE EXCEPTION 'New max_sequence_number not higher than previous value';
110+
END IF;
111+
112+
RETURN NEW;
113+
END;
114+
$$
115+
LANGUAGE plpgsql;
116+
117+
118+
CREATE TRIGGER trig_check_persistence_id_max_sequence_number
119+
BEFORE UPDATE ON public.journal_persistence_ids
120+
FOR EACH ROW
121+
EXECUTE PROCEDURE public.check_persistence_id_max_sequence_number();

core/src/test/resources/schema/postgres/partitioned-schema.sql

+23-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ CREATE TABLE IF NOT EXISTS public.snapshot
6666
PRIMARY KEY (persistence_id, sequence_number)
6767
);
6868

69+
DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON public.journal_persistence_ids;
70+
DROP FUNCTION IF EXISTS public.check_persistence_id_max_sequence_number();
6971
DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON public.journal;
7072
DROP FUNCTION IF EXISTS public.update_journal_persistence_ids();
7173
DROP TABLE IF EXISTS public.journal_persistence_ids;
@@ -86,8 +88,8 @@ BEGIN
8688
VALUES (NEW.persistence_id, NEW.sequence_number, NEW.ordering, NEW.ordering)
8789
ON CONFLICT (persistence_id) DO UPDATE
8890
SET
89-
max_sequence_number = GREATEST(public.journal_persistence_ids.max_sequence_number, NEW.sequence_number),
90-
max_ordering = GREATEST(public.journal_persistence_ids.max_ordering, NEW.ordering),
91+
max_sequence_number = NEW.sequence_number,
92+
max_ordering = NEW.ordering,
9193
min_ordering = LEAST(public.journal_persistence_ids.min_ordering, NEW.ordering);
9294

9395
RETURN NEW;
@@ -99,3 +101,22 @@ CREATE TRIGGER trig_update_journal_persistence_ids
99101
AFTER INSERT ON public.journal
100102
FOR EACH ROW
101103
EXECUTE PROCEDURE public.update_journal_persistence_ids();
104+
105+
CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS
106+
$$
107+
DECLARE
108+
BEGIN
109+
IF NEW.max_sequence_number <= OLD.max_sequence_number THEN
110+
RAISE EXCEPTION 'New max_sequence_number not higher than previous value';
111+
END IF;
112+
113+
RETURN NEW;
114+
END;
115+
$$
116+
LANGUAGE plpgsql;
117+
118+
119+
CREATE TRIGGER trig_check_persistence_id_max_sequence_number
120+
BEFORE UPDATE ON public.journal_persistence_ids
121+
FOR EACH ROW
122+
EXECUTE PROCEDURE public.check_persistence_id_max_sequence_number();

core/src/test/resources/schema/postgres/plain-schema.sql

+23-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ CREATE TABLE IF NOT EXISTS public.snapshot
3939
PRIMARY KEY (persistence_id, sequence_number)
4040
);
4141

42+
DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON public.journal_persistence_ids;
43+
DROP FUNCTION IF EXISTS public.check_persistence_id_max_sequence_number();
4244
DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON public.journal;
4345
DROP FUNCTION IF EXISTS public.update_journal_persistence_ids();
4446
DROP TABLE IF EXISTS public.journal_persistence_ids;
@@ -59,8 +61,8 @@ BEGIN
5961
VALUES (NEW.persistence_id, NEW.sequence_number, NEW.ordering, NEW.ordering)
6062
ON CONFLICT (persistence_id) DO UPDATE
6163
SET
62-
max_sequence_number = GREATEST(public.journal_persistence_ids.max_sequence_number, NEW.sequence_number),
63-
max_ordering = GREATEST(public.journal_persistence_ids.max_ordering, NEW.ordering),
64+
max_sequence_number = NEW.sequence_number,
65+
max_ordering = NEW.ordering,
6466
min_ordering = LEAST(public.journal_persistence_ids.min_ordering, NEW.ordering);
6567

6668
RETURN NEW;
@@ -72,3 +74,22 @@ CREATE TRIGGER trig_update_journal_persistence_ids
7274
AFTER INSERT ON public.journal
7375
FOR EACH ROW
7476
EXECUTE PROCEDURE public.update_journal_persistence_ids();
77+
78+
CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS
79+
$$
80+
DECLARE
81+
BEGIN
82+
IF NEW.max_sequence_number <= OLD.max_sequence_number THEN
83+
RAISE EXCEPTION 'New max_sequence_number not higher than previous value';
84+
END IF;
85+
86+
RETURN NEW;
87+
END;
88+
$$
89+
LANGUAGE plpgsql;
90+
91+
92+
CREATE TRIGGER trig_check_persistence_id_max_sequence_number
93+
BEFORE UPDATE ON public.journal_persistence_ids
94+
FOR EACH ROW
95+
EXECUTE PROCEDURE public.check_persistence_id_max_sequence_number();

migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala

+22-2
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ private[journal] trait JournalSchema {
108108
VALUES (NEW.#$jPersistenceId, NEW.#$sequenceNumber, NEW.#$ordering, NEW.#$ordering)
109109
ON CONFLICT (#$persistenceId) DO UPDATE
110110
SET
111-
#$maxSequenceNumber = GREATEST(#$fullTableName.#$maxSequenceNumber, NEW.#$sequenceNumber),
112-
#$maxOrdering = GREATEST(#$fullTableName.#$maxOrdering, NEW.#$ordering),
111+
#$maxSequenceNumber = NEW.#$sequenceNumber,
112+
#$maxOrdering = NEW.#$ordering,
113113
#$minOrdering = LEAST(#$fullTableName.#$minOrdering, NEW.#$ordering);
114114
115115
RETURN NEW;
@@ -130,6 +130,26 @@ private[journal] trait JournalSchema {
130130
FOR EACH ROW
131131
EXECUTE PROCEDURE #$schema.update_journal_persistence_ids();
132132
"""
133+
134+
_ <- sqlu"""
135+
CREATE OR REPLACE FUNCTION #$schema.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS $$$$
136+
DECLARE
137+
BEGIN
138+
IF NEW.#$maxSequenceNumber <= OLD.#$maxSequenceNumber THEN
139+
RAISE EXCEPTION 'New max_sequence_number not higher than previous value';
140+
END IF;
141+
142+
RETURN NEW;
143+
END;
144+
$$$$ LANGUAGE plpgsql;
145+
"""
146+
147+
_ <- sqlu"""
148+
CREATE TRIGGER trig_check_persistence_id_max_sequence_number
149+
BEFORE UPDATE ON #$fullTableName
150+
FOR EACH ROW
151+
EXECUTE PROCEDURE #$schema.check_persistence_id_max_sequence_number();
152+
"""
133153
} yield ()
134154
}
135155
}

migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala

+3
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ trait PrepareDatabase extends BeforeAndAfterEach with BeforeAndAfterAll with Sca
190190
_ <- sqlu"""DROP TABLE IF EXISTS migration.#$journalTableName"""
191191
_ <- sqlu"""DROP TRIGGER IF EXISTS trig_update_journal_persistence_ids ON migration.#$journalTableName"""
192192
_ <- sqlu"""DROP FUNCTION IF EXISTS migration.update_journal_persistence_ids()"""
193+
_ <-
194+
sqlu"""DROP TRIGGER IF EXISTS trig_check_persistence_id_max_sequence_number ON migration.#$journalPersistenceIdsTableName"""
195+
_ <- sqlu"""DROP FUNCTION IF EXISTS migration.check_persistence_id_max_sequence_number()"""
193196
_ <- sqlu"""DROP TABLE IF EXISTS migration.#$journalPersistenceIdsTableName"""
194197
_ <- sqlu"""CREATE TABLE IF NOT EXISTS migration.#$journalTableName
195198
(

scripts/migration-0.6.0/2-create-function-update-journal-persistence-ids.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ BEGIN
2626
jpi_table := schema || '.' || jpi_table_name;
2727
cols := jpi_persistence_id_column || ', ' || jpi_max_sequence_number_column || ', ' || jpi_max_ordering_column || ', ' || jpi_min_ordering_column;
2828
vals := '($1).' || j_persistence_id_column || ', ($1).' || j_sequence_number_column || ', ($1).' || j_ordering_column || ',($1).' || j_ordering_column;
29-
upds := jpi_max_sequence_number_column || ' = GREATEST(' || jpi_table || '.' || jpi_max_sequence_number_column || ', ($1).' || j_sequence_number_column || '), ' ||
30-
jpi_max_ordering_column || ' = GREATEST(' || jpi_table || '.' || jpi_max_ordering_column || ', ($1).' || j_ordering_column || '), ' ||
29+
upds := jpi_max_sequence_number_column || ' = ($1).' || j_sequence_number_column || ', ' ||
30+
jpi_max_ordering_column || ' = ($1).' || j_ordering_column || ', ' ||
3131
jpi_min_ordering_column || ' = LEAST(' || jpi_table || '.' || jpi_min_ordering_column || ', ($1).' || j_ordering_column || ')';
3232

3333
sql := 'INSERT INTO ' || jpi_table || ' (' || cols || ') VALUES (' || vals || ') ' ||
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
-- replace schema value if required
2+
CREATE OR REPLACE FUNCTION public.check_persistence_id_max_sequence_number() RETURNS TRIGGER AS
3+
$$
4+
DECLARE
5+
-- replace with appropriate values
6+
jpi_max_sequence_number_column CONSTANT TEXT := 'max_sequence_number';
7+
8+
-- variables
9+
sql TEXT;
10+
BEGIN
11+
sql := 'IF NEW.' || jpi_max_sequence_number_column || ' <= OLD.' || jpi_max_sequence_number_column || ' THEN
12+
RAISE EXCEPTION ''New max_sequence_number not higher than previous value'';
13+
END IF;';
14+
15+
EXECUTE sql USING NEW;
16+
RETURN NEW;
17+
END;
18+
$$ LANGUAGE plpgsql;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
DO $$
2+
DECLARE
3+
-- replace with appropriate values
4+
schema CONSTANT TEXT := 'public';
5+
jpi_table_name CONSTANT TEXT := 'journal_persistence_ids';
6+
7+
-- variables
8+
jpi_table TEXT;
9+
sql TEXT;
10+
BEGIN
11+
jpi_table := schema || '.' || jpi_table_name;
12+
13+
sql := 'CREATE TRIGGER trig_check_persistence_id_max_sequence_number
14+
BEFORE UPDATE ON ' || jpi_table || ' FOR EACH ROW
15+
EXECUTE PROCEDURE ' || schema || '.check_persistence_id_max_sequence_number()';
16+
17+
EXECUTE sql;
18+
END ;
19+
$$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)