Skip to content

Commit 95b5e71

Browse files
committed
Avoid getLastMessageId RPC when calling hasMessageAvailable after seek by timestamp (#491)
(cherry picked from commit 15e0b00)
1 parent 77de765 commit 95b5e71

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

lib/ConsumerImpl.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,6 +1561,10 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
15611561
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
15621562

15631563
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
1564+
if (!incomingMessages_.empty()) {
1565+
callback(ResultOk, true);
1566+
return;
1567+
}
15641568
bool compareMarkDeletePosition;
15651569
{
15661570
std::lock_guard<std::mutex> lock{mutexForMessageId_};
@@ -1726,6 +1730,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Se
17261730
hasSoughtByTimestamp_.store(true, std::memory_order_release);
17271731
} else {
17281732
seekMessageId_ = *boost::get<MessageId>(&seekArg);
1733+
hasSoughtByTimestamp_.store(false, std::memory_order_release);
17291734
}
17301735
seekStatus_ = SeekStatus::IN_PROGRESS;
17311736
seekCallback_ = std::move(callback);

tests/ReaderTest.cc

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
#include <pulsar/Reader.h>
2222
#include <time.h>
2323

24+
#include <future>
2425
#include <string>
26+
#include <thread>
2527

2628
#include "HttpHelper.h"
2729
#include "PulsarFriend.h"
@@ -850,7 +852,7 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
850852
ASSERT_FALSE(hasMessageAvailable);
851853
}
852854

853-
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
855+
TEST_F(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
854856
using namespace std::chrono;
855857
const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
856858
Producer producer;
@@ -862,12 +864,10 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
862864

863865
auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
864866
ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
865-
if (GetParam()) {
866-
if (msgId == MessageId::earliest()) {
867-
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
868-
} else {
869-
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
870-
}
867+
if (msgId == MessageId::earliest()) {
868+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
869+
} else {
870+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
871871
}
872872
};
873873

@@ -886,6 +886,22 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
886886
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
887887
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
888888
}
889+
890+
// Test `hasMessageAvailableAsync` will complete immediately if the incoming message queue is non-empty
891+
Reader reader;
892+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
893+
reader.seek(timestampBeforeSend);
894+
std::promise<std::thread::id> threadIdPromise;
895+
896+
waitUntil(seconds(3),
897+
[&reader] { return PulsarFriend::getConsumer(reader)->getNumOfPrefetchedMessages() > 0; });
898+
reader.hasMessageAvailableAsync([&threadIdPromise](Result result, bool hasMessageAvailable) {
899+
ASSERT_EQ(ResultOk, result);
900+
ASSERT_TRUE(hasMessageAvailable);
901+
threadIdPromise.set_value(std::this_thread::get_id());
902+
});
903+
auto threadId = threadIdPromise.get_future().get();
904+
ASSERT_EQ(threadId, std::this_thread::get_id());
889905
}
890906

891907
TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {

0 commit comments

Comments
 (0)