@@ -63,13 +63,11 @@ FROM json_each(?) e",
63
63
// language=SQLite
64
64
let supersede_statement = db. prepare_v2 (
65
65
"\
66
- UPDATE ps_oplog SET
67
- superseded = 1,
68
- op = 2,
69
- data = NULL
66
+ DELETE FROM ps_oplog
70
67
WHERE ps_oplog.superseded = 0
71
68
AND unlikely(ps_oplog.bucket = ?1)
72
- AND ps_oplog.key = ?2" ,
69
+ AND ps_oplog.key = ?2
70
+ RETURNING op_id, hash" ,
73
71
) ?;
74
72
supersede_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
75
73
@@ -110,21 +108,39 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
110
108
111
109
last_op = Some ( op_id) ;
112
110
113
- let mut key: String = "" . to_string ( ) ;
114
-
115
111
if op == "PUT" || op == "REMOVE" {
112
+ let key: String ;
116
113
if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type. as_ref ( ) , object_id. as_ref ( ) ) {
117
114
let subkey = iterate_statement. column_text ( 6 ) . unwrap_or ( "null" ) ;
118
- let populated_key = format ! ( "{}/{}/{}" , & object_type, & object_id, subkey) ;
115
+ key = format ! ( "{}/{}/{}" , & object_type, & object_id, subkey) ;
116
+ } else {
117
+ key = String :: from ( "" ) ;
118
+ }
119
119
120
- supersede_statement. bind_text ( 2 , & populated_key, sqlite:: Destructor :: STATIC ) ?;
121
- supersede_statement. exec ( ) ?;
120
+ supersede_statement. bind_text ( 2 , & key, sqlite:: Destructor :: STATIC ) ?;
122
121
123
- key = populated_key;
122
+ let mut superseded = false ;
123
+
124
+ while ( supersede_statement. step ( ) ? == ResultCode :: ROW ) {
125
+ // Superseded (deleted) a previous operation, add the checksum
126
+ let supersede_checksum = supersede_statement. column_int ( 1 ) ?;
127
+ add_checksum = add_checksum. wrapping_add ( supersede_checksum) ;
128
+ // Superseded an operation, only skip if the bucket was empty
129
+ superseded = true ;
130
+ }
131
+ supersede_statement. reset ( ) ?;
132
+
133
+ let should_skip_remove = ( is_empty || !superseded) && op == "REMOVE" ;
134
+ if ( should_skip_remove) {
135
+ // If a REMOVE statement did not replace (supersede) any previous
136
+ // operations, we do not need to persist it.
137
+ // The same applies if the bucket was not synced to the local db yet,
138
+ // even if it did supersede another operation.
139
+ // Handle the same as MOVE.
140
+ add_checksum = add_checksum. wrapping_add ( checksum) ;
141
+ continue ;
124
142
}
125
- }
126
143
127
- if op == "PUT" || ( op == "REMOVE" && !is_empty) {
128
144
let opi = if op == "PUT" { 3 } else { 4 } ;
129
145
insert_statement. bind_int64 ( 2 , op_id) ?;
130
146
insert_statement. bind_int ( 3 , opi) ?;
@@ -149,7 +165,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
149
165
150
166
insert_statement. bind_int ( 8 , checksum) ?;
151
167
insert_statement. exec ( ) ?;
152
- } else if op == "MOVE" || ( is_empty && op == "REMOVE" ) {
168
+ } else if op == "MOVE" {
153
169
add_checksum = add_checksum. wrapping_add ( checksum) ;
154
170
} else if op == "CLEAR" {
155
171
// Any remaining PUT operations should get an implicit REMOVE
0 commit comments