33
33
34
34
#include " yb/yql/pggate/util/ybc_guc.h"
35
35
36
+ DEFINE_test_flag (bool , refresh_partitions_after_fetched_sample_blocks, false ,
37
+ " Force table partitions refresh after sample blocks are fetched." );
36
38
DEFINE_test_flag (int64, delay_after_table_analyze_ms, 0 ,
37
39
" Add this delay after each table is analyzed." );
38
40
@@ -56,6 +58,11 @@ class SampleRowsPickerIf {
56
58
57
59
namespace {
58
60
61
+ std::string AsDebugHexString (const LWPgsqlSampleBlockPB& sample_block_pb) {
62
+ return AsDebugHexString (
63
+ std::make_pair (sample_block_pb.lower_bound (), sample_block_pb.upper_bound ()));
64
+ }
65
+
59
66
class PgDocSampleOp : public PgDocReadOp {
60
67
public:
61
68
struct SamplingStats {
@@ -80,13 +87,14 @@ class PgDocSampleOp : public PgDocReadOp {
80
87
template_read_req.has_sampling_state (), IllegalState,
81
88
" PgDocSampleOp is expected to have sampling state" );
82
89
SCHECK (
83
- sample_blocks_.empty () || !template_read_req.sampling_state ().is_blocks_sampling_stage (),
90
+ sorted_sample_blocks_.empty () ||
91
+ !template_read_req.sampling_state ().is_blocks_sampling_stage (),
84
92
IllegalState, " Sample blocks are not expected to be set for blocks sampling stage" );
85
93
86
94
// Sample blocks will be distributed across tablets/table partitions below.
87
95
std::optional<SampleBlocksFeed> sample_blocks_feed;
88
- if (!sample_blocks_ .empty ()) {
89
- sample_blocks_feed.emplace (sample_blocks_ );
96
+ if (!sorted_sample_blocks_ .empty ()) {
97
+ sample_blocks_feed.emplace (sorted_sample_blocks_ );
90
98
}
91
99
92
100
// Create one PgsqlOp per partition
@@ -114,6 +122,9 @@ class PgDocSampleOp : public PgDocReadOp {
114
122
VERIFY_RESULT (AssignSampleBlocks (
115
123
&read_req, partition_keys, partition, &sample_blocks_feed.value ()))) {
116
124
pgsql_ops_[partition]->set_active (true );
125
+ VLOG_WITH_PREFIX_AND_FUNC (3 )
126
+ << " Request #" << partition << " of " << partition_keys.size ()
127
+ << " for partition: " << Slice (partition_keys[partition]).ToDebugHexString ();
117
128
++active_op_count_;
118
129
}
119
130
}
@@ -174,8 +185,41 @@ class PgDocSampleOp : public PgDocReadOp {
174
185
const SamplingStats& GetSamplingStats () const { return sampling_stats_; }
175
186
176
187
Status SetSampleBlocksBounds (std::vector<std::pair<KeyBuffer, KeyBuffer>>&& sample_blocks) {
177
- sample_blocks_ = std::move (sample_blocks);
178
- SCHECK (!sample_blocks_.empty (), IllegalState, " Sample blocks list should not be empty." );
188
+ sorted_sample_blocks_ = std::move (sample_blocks);
189
+ SCHECK (!sorted_sample_blocks_.empty (), IllegalState, " Sample blocks list should not be empty." );
190
+
191
+ std::sort (
192
+ sorted_sample_blocks_.begin (), sorted_sample_blocks_.end (),
193
+ [](const std::pair<KeyBuffer, KeyBuffer>& b1, const std::pair<KeyBuffer, KeyBuffer>& b2) {
194
+ return b1.first < b2.first ;
195
+ });
196
+
197
+ if (VLOG_IS_ON (3 )) {
198
+ size_t idx = 0 ;
199
+ for (const auto & sample_block : sorted_sample_blocks_) {
200
+ VLOG_WITH_FUNC (3 ) << " Sorted sample block #" << idx << " : "
201
+ << AsDebugHexString (sample_block);
202
+ ++idx;
203
+ }
204
+ }
205
+
206
+ Slice prev_upper_bound;
207
+ size_t idx = 0 ;
208
+ for (const auto & sample_block : sorted_sample_blocks_) {
209
+ if (sample_block.first .AsSlice () < prev_upper_bound) {
210
+ return STATUS_FORMAT (
211
+ InternalError, " Sorted sample block #$0: $1 starts before prev_upper_bound: $2" , idx,
212
+ AsDebugHexString (sample_block), AsDebugHexString (prev_upper_bound));
213
+ }
214
+ prev_upper_bound = sample_block.second .AsSlice ();
215
+ ++idx;
216
+ }
217
+
218
+ if (FLAGS_TEST_refresh_partitions_after_fetched_sample_blocks) {
219
+ const auto pg_table_id = table_->pg_table_id ();
220
+ pg_session_->InvalidateTableCache (pg_table_id, InvalidateOnPgClient::kTrue );
221
+ table_ = PgTable (CHECK_RESULT (pg_session_->LoadTable (pg_table_id)));
222
+ }
179
223
return Status::OK ();
180
224
}
181
225
@@ -184,18 +228,19 @@ class PgDocSampleOp : public PgDocReadOp {
184
228
class SampleBlocksFeed {
185
229
public:
186
230
// Transfers all sample blocks from `other` list into internal storage.
187
- explicit SampleBlocksFeed (const std::vector<std::pair<KeyBuffer, KeyBuffer>>& sample_blocks)
188
- : sample_blocks_(sample_blocks) {
189
- sample_block_iter_ = sample_blocks_.cbegin ();
190
- is_single_unbounded_block_ = sample_block_iter_ != sample_blocks_.cend () &&
231
+ explicit SampleBlocksFeed (
232
+ const std::vector<std::pair<KeyBuffer, KeyBuffer>>& sorted_sample_blocks)
233
+ : sorted_sample_blocks_(sorted_sample_blocks) {
234
+ sample_block_iter_ = sorted_sample_blocks_.cbegin ();
235
+ is_single_unbounded_block_ = sample_block_iter_ != sorted_sample_blocks_.cend () &&
191
236
sample_block_iter_->first .empty () &&
192
237
sample_block_iter_->second .empty ();
193
238
}
194
239
195
240
// Fetches sample block boundaries from internal storage until `exclusive_upper_bound` and
196
241
// assigns them to `dst`.
197
242
Status FetchTo (
198
- ::yb::ArenaList<::yb:: LWPgsqlSampleBlockPB>* dst, std::string exclusive_upper_bound) {
243
+ ::yb::ArenaList<LWPgsqlSampleBlockPB>* dst, const std::string& exclusive_upper_bound) {
199
244
if (is_single_unbounded_block_) {
200
245
// We should fully sample all tablets.
201
246
auto & sample_block_pb = *dst->Add ();
@@ -204,37 +249,92 @@ class PgDocSampleOp : public PgDocReadOp {
204
249
return Status::OK ();
205
250
}
206
251
207
- for (; sample_block_iter_ != sample_blocks_ .cend () &&
252
+ for (; sample_block_iter_ != sorted_sample_blocks_ .cend () &&
208
253
(exclusive_upper_bound.empty () ||
209
- sample_block_iter_->first .AsSlice () < exclusive_upper_bound);
254
+ (!sample_block_iter_->second .empty () &&
255
+ sample_block_iter_->second .AsSlice () <= exclusive_upper_bound));
210
256
sample_block_iter_++) {
211
-
212
- const auto cmp = sample_block_iter_->first .AsSlice ().compare (prev_upper_bound);
213
- if (cmp < 0 ) {
214
- return STATUS_FORMAT (
215
- InternalError, " Sample block: $0 starts before prev_upper_bound: $1" ,
216
- AsDebugHexString (*sample_block_iter_), AsDebugHexString (prev_upper_bound));
217
- }
218
- if (cmp == 0 && !dst->empty ()) {
219
- // Combine with the previous block.
220
- *dst->back ().mutable_upper_bound () = sample_block_iter_->second .AsSlice ();
257
+ LWPgsqlSampleBlockPB* sample_block_pb;
258
+
259
+ if (!override_next_block_lower_bound_.empty ()) {
260
+ SCHECK (
261
+ dst->empty (), InternalError,
262
+ Format (
263
+ " Expected dst (has $0 blocks) to be empty when override_next_block_lower_bound_ "
264
+ " is set "
265
+ " ($1)" ,
266
+ dst->size (), AsDebugHexString (override_next_block_lower_bound_)));
267
+ sample_block_pb = dst->Add ();
268
+ sample_block_pb->dup_lower_bound (override_next_block_lower_bound_.AsSlice ());
269
+ override_next_block_lower_bound_.clear ();
221
270
} else {
222
- auto & sample_block_pb = *dst->Add ();
223
- *sample_block_pb.mutable_lower_bound () = sample_block_iter_->first .AsSlice ();
224
- *sample_block_pb.mutable_upper_bound () = sample_block_iter_->second .AsSlice ();
271
+ sample_block_pb = AddOrUpdateBlock (dst, sample_block_iter_->first .AsSlice ());
225
272
}
226
273
227
- prev_upper_bound = sample_block_iter_->second .AsSlice ();
274
+ *sample_block_pb->mutable_upper_bound () = sample_block_iter_->second .AsSlice ();
275
+ RETURN_NOT_OK (OnLatestBlockBoundsSet (dst));
276
+ }
277
+
278
+ SCHECK (
279
+ !exclusive_upper_bound.empty () || sample_block_iter_ == sorted_sample_blocks_.cend (),
280
+ InternalError,
281
+ Format (
282
+ " Unexpected stop at sorted sample block $0 while exclusive_upper_bound is empty" ,
283
+ AsDebugHexString (*sample_block_iter_)));
284
+
285
+ if (sample_block_iter_ == sorted_sample_blocks_.cend () ||
286
+ sample_block_iter_->first .AsSlice () >= exclusive_upper_bound) {
287
+ return Status::OK ();
228
288
}
229
289
290
+ // Sample block might cross exclusive_upper_bound due to tablet has been split since
291
+ // block boundaries were calculated - split the block in this case.
292
+ VLOG_WITH_FUNC (1 )
293
+ << " Splitting the sample block: " << AsDebugHexString (*sample_block_iter_)
294
+ << " exclusive_upper_bound: " << AsDebugHexString (Slice (exclusive_upper_bound));
295
+
296
+ auto * sample_block_pb = AddOrUpdateBlock (dst, sample_block_iter_->first .AsSlice ());
297
+ sample_block_pb->dup_upper_bound (exclusive_upper_bound);
298
+ RETURN_NOT_OK (OnLatestBlockBoundsSet (dst));
299
+
300
+ override_next_block_lower_bound_ = exclusive_upper_bound;
301
+
230
302
return Status::OK ();
231
303
}
232
304
233
305
private:
234
- const std::vector<std::pair<KeyBuffer, KeyBuffer>>& sample_blocks_;
306
+ LWPgsqlSampleBlockPB* AddOrUpdateBlock (
307
+ ::yb::ArenaList<LWPgsqlSampleBlockPB>* dst, Slice lower_bound) {
308
+ if (!dst->empty () && dst->back ().upper_bound () == sample_block_iter_->first .AsSlice ()) {
309
+ // Update previous block to combine with the current one.
310
+ return &dst->back ();
311
+ }
312
+ auto * block = dst->Add ();
313
+ *block->mutable_lower_bound () = lower_bound;
314
+ return block;
315
+ }
316
+
317
+ Status OnLatestBlockBoundsSet (::yb::ArenaList<LWPgsqlSampleBlockPB>* dst) {
318
+ auto * sample_block_pb = &dst->back ();
319
+ VLOG_WITH_FUNC (2 ) << " Sample block at dst[" << dst->size () - 1 << " ]: "
320
+ << AsDebugHexString (*sample_block_pb);
321
+ SCHECK (
322
+ sample_block_pb->upper_bound ().empty () ||
323
+ sample_block_pb->lower_bound () < sample_block_pb->upper_bound (),
324
+ InternalError,
325
+ Format (
326
+ " Wrong bounds order for sample block: $0" ,
327
+ AsDebugHexString (*sample_block_pb)));
328
+ return Status::OK ();
329
+ }
330
+
331
+ const std::vector<std::pair<KeyBuffer, KeyBuffer>>& sorted_sample_blocks_;
235
332
std::vector<std::pair<KeyBuffer, KeyBuffer>>::const_iterator sample_block_iter_;
236
- Slice prev_upper_bound;
237
333
bool is_single_unbounded_block_;
334
+ // Not empty iff we've split sample block at sample_block_iter_ during previous FetchTo call.
335
+ // In this case we use the rest of this block starting with override_next_block_lower_bound_
336
+ // for the next FetchTo call.
337
+ KeyBuffer override_next_block_lower_bound_;
238
338
};
239
339
240
340
Result<bool > AssignSampleBlocks (
@@ -262,7 +362,7 @@ class PgDocSampleOp : public PgDocReadOp {
262
362
263
363
const std::string LogPrefix () const { return log_prefix_; }
264
364
265
- std::vector<std::pair<KeyBuffer, KeyBuffer>> sample_blocks_ ;
365
+ std::vector<std::pair<KeyBuffer, KeyBuffer>> sorted_sample_blocks_ ;
266
366
SamplingStats sampling_stats_;
267
367
std::string log_prefix_;
268
368
};
@@ -387,20 +487,6 @@ class SampleBlocksPicker : public SamplePickerBase {
387
487
return Status::OK ();
388
488
}
389
489
390
- std::sort (
391
- blocks_reservoir_.begin (), blocks_reservoir_.end (),
392
- [](const std::pair<KeyBuffer, KeyBuffer>& b1, const std::pair<KeyBuffer, KeyBuffer>& b2) {
393
- return b1.first < b2.first ;
394
- });
395
-
396
- if (VLOG_IS_ON (3 )) {
397
- size_t idx = 0 ;
398
- for (const auto & sample_block : blocks_reservoir_) {
399
- VLOG_WITH_FUNC (3 ) << " Sorted sample block #" << idx << " : "
400
- << AsDebugHexString (sample_block);
401
- ++idx;
402
- }
403
- }
404
490
return Status::OK ();
405
491
}
406
492
0 commit comments