diff --git a/.github/workflows/ci-documentbot.yml b/.github/workflows/ci-documentbot.yml index 1006661b60d2a..98e86eb6a07ad 100644 --- a/.github/workflows/ci-documentbot.yml +++ b/.github/workflows/ci-documentbot.yml @@ -36,7 +36,7 @@ jobs: if: (github.repository == 'apache/pulsar') && (github.event.pull_request.state == 'open') permissions: pull-requests: write - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - name: Labeling uses: apache/pulsar-test-infra/docbot@master diff --git a/.github/workflows/ci-go-functions.yaml b/.github/workflows/ci-go-functions.yaml index 9aaf1ea0958d4..e39f2fe48d4a2 100644 --- a/.github/workflows/ci-go-functions.yaml +++ b/.github/workflows/ci-go-functions.yaml @@ -38,7 +38,7 @@ env: jobs: preconditions: name: Preconditions - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 outputs: docs_only: ${{ steps.check_changes.outputs.docs_only }} steps: @@ -73,7 +73,7 @@ jobs: needs: preconditions if: ${{ needs.preconditions.outputs.docs_only != 'true' }} name: Go ${{ matrix.go-version }} Functions style check - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 strategy: matrix: go-version: ['1.23'] diff --git a/.github/workflows/ci-maven-cache-update.yaml b/.github/workflows/ci-maven-cache-update.yaml index 6b0310e487123..ac0f66e3837c1 100644 --- a/.github/workflows/ci-maven-cache-update.yaml +++ b/.github/workflows/ci-maven-cache-update.yaml @@ -59,7 +59,7 @@ jobs: matrix: include: - name: all modules - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 cache_name: 'm2-dependencies-all' mvn_arguments: '' @@ -68,7 +68,7 @@ jobs: cache_name: 'm2-dependencies-all' - name: core-modules - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 cache_name: 'm2-dependencies-core-modules' mvn_arguments: '-Pcore-modules,-main' diff --git a/.github/workflows/ci-owasp-dependency-check.yaml b/.github/workflows/ci-owasp-dependency-check.yaml index 9f6c90d359d2a..ce24b7bb10c53 100644 --- a/.github/workflows/ci-owasp-dependency-check.yaml +++ b/.github/workflows/ci-owasp-dependency-check.yaml @@ -35,7 +35,7 @@ jobs: env: JOB_NAME: Check ${{ matrix.branch }} DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 75 strategy: fail-fast: false diff --git a/.github/workflows/ci-pulsarbot.yaml b/.github/workflows/ci-pulsarbot.yaml index 4ea83404856d2..e0dabac1193ea 100644 --- a/.github/workflows/ci-pulsarbot.yaml +++ b/.github/workflows/ci-pulsarbot.yaml @@ -24,7 +24,7 @@ on: jobs: pulsarbot: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 10 if: github.event_name == 'issue_comment' && contains(github.event.comment.body, '/pulsarbot') steps: diff --git a/.github/workflows/pulsar-ci-flaky.yaml b/.github/workflows/pulsar-ci-flaky.yaml index 0451b0e324471..19d723edd67d8 100644 --- a/.github/workflows/pulsar-ci-flaky.yaml +++ b/.github/workflows/pulsar-ci-flaky.yaml @@ -87,7 +87,7 @@ env: jobs: preconditions: name: Preconditions - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 outputs: docs_only: ${{ steps.check_changes.outputs.docs_only }} changed_tests: ${{ steps.changes.outputs.tests_files }} @@ -176,7 +176,7 @@ jobs: THREAD_LEAK_DETECTOR_DIR: ${{ github.workspace }}/target/thread-leak-dumps NETTY_LEAK_DETECTION: "${{ needs.preconditions.outputs.netty_leak_detection }}" NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 100 if: ${{ needs.preconditions.outputs.docs_only != 'true' }} steps: diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index c13fea5ba135a..dbbb1218d83df 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -87,7 +87,7 @@ env: jobs: preconditions: name: Preconditions - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 outputs: docs_only: ${{ steps.check_changes.outputs.docs_only }} changed_tests: ${{ steps.changes.outputs.tests_files }} @@ -169,7 +169,7 @@ jobs: JOB_NAME: Build and License check DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} CI_JDK_MAJOR_VERSION: ${{ needs.preconditions.outputs.jdk_major_version }} - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 if: ${{ needs.preconditions.outputs.docs_only != 'true' }} steps: @@ -252,7 +252,7 @@ jobs: THREAD_LEAK_DETECTOR_DIR: ${{ github.workspace }}/target/thread-leak-dumps NETTY_LEAK_DETECTION: "${{ needs.preconditions.outputs.netty_leak_detection }}" NETTY_LEAK_DUMP_DIR: ${{ github.workspace }}/target/netty-leak-dumps - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: ${{ matrix.timeout || 60 }} needs: ['preconditions', 'build-and-license-check'] if: ${{ needs.preconditions.outputs.docs_only != 'true' }} @@ -407,7 +407,7 @@ jobs: unit-tests-upload-coverage: name: CI - Unit - Upload Coverage - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 30 needs: ['preconditions', 'unit-tests'] env: @@ -489,7 +489,7 @@ jobs: pulsar-java-test-image: name: Build Pulsar java-test-image docker image - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 needs: ['preconditions', 'build-and-license-check'] if: ${{ needs.preconditions.outputs.docs_only != 'true'}} @@ -571,7 +571,7 @@ jobs: integration-tests: name: CI - Integration - ${{ matrix.name }} - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: ${{ matrix.timeout || 60 }} needs: ['preconditions', 'pulsar-java-test-image'] if: ${{ needs.preconditions.outputs.docs_only != 'true' }} @@ -774,7 +774,7 @@ jobs: integration-tests-upload-coverage: name: CI - Integration - Upload Coverage - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 30 needs: ['preconditions', 'integration-tests'] if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }} @@ -861,7 +861,7 @@ jobs: delete-integration-test-docker-image-artifact: name: "Delete integration test docker image artifact" - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 10 needs: [ 'preconditions', @@ -885,7 +885,7 @@ jobs: pulsar-test-latest-version-image: name: Build Pulsar docker image - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 needs: ['preconditions', 'build-and-license-check'] if: ${{ needs.preconditions.outputs.docs_only != 'true' }} @@ -1009,7 +1009,7 @@ jobs: system-tests: name: CI - System - ${{ matrix.name }} - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 needs: ['preconditions', 'pulsar-test-latest-version-image'] if: ${{ needs.preconditions.outputs.docs_only != 'true' }} @@ -1171,7 +1171,7 @@ jobs: system-tests-upload-coverage: name: CI - System - Upload Coverage - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 30 needs: ['preconditions', 'system-tests'] if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }} @@ -1259,7 +1259,7 @@ jobs: flaky-system-tests: name: CI Flaky - System - ${{ matrix.name }} - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 needs: [ 'preconditions', 'pulsar-test-latest-version-image' ] if: ${{ needs.preconditions.outputs.docs_only != 'true' }} @@ -1398,7 +1398,7 @@ jobs: delete-system-test-docker-image-artifact: name: "Delete system test docker image artifact" - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 10 needs: [ 'preconditions', @@ -1459,7 +1459,7 @@ jobs: codeql: name: Run CodeQL Analysis - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 needs: ['preconditions', 'unit-tests'] if: ${{ (needs.preconditions.outputs.java_non_tests == 'true' || github.event_name != 'pull_request') && ((github.event_name == 'pull_request' && github.base_ref == 'master') || (github.event_name != 'pull_request' && github.ref_name == 'master')) }} @@ -1524,7 +1524,7 @@ jobs: owasp-dep-check: name: OWASP dependency check - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 120 needs: [ 'preconditions', 'integration-tests' ] if: ${{ needs.preconditions.outputs.need_owasp == 'true' }} @@ -1630,7 +1630,7 @@ jobs: # this is to allow the workflow scheduled jobs to show as cancelled instead of failed since scheduled # jobs are not enabled for other than apache/pulsar repository. if: ${{ always() && !(cancelled() && github.repository != 'apache/pulsar' && github.event_name == 'schedule') }} - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 timeout-minutes: 10 needs: [ 'preconditions', diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 88b1566024a3d..4a1ad3904a205 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -174,7 +174,8 @@ function test_group_other() { -pl '!com.datastax.oss:distribution,!com.datastax.oss:pulsar-offloader-distribution,!com.datastax.oss:pulsar-server-distribution,!com.datastax.oss:pulsar-io-distribution,!com.datastax.oss:pulsar-all-docker-image,!com.datastax.oss:pulsar-experimental-docker-image' \ -PskipTestsForUnitGroupOther -DdisableIoMainProfile=true -DskipIntegrationTests \ -Dexclude='**/ManagedLedgerTest.java, - **/OffloadersCacheTest.java + **/OffloadersCacheTest.java, + **/OffsetsCacheTest.java, **/PrimitiveSchemaTest.java, **/BlobStoreManagedLedgerOffloaderTest.java, **/BlobStoreManagedLedgerOffloaderStreamingTest.java' @@ -184,6 +185,7 @@ function test_group_other() { mvn_test -pl tiered-storage/jcloud -Dinclude='**/BlobStoreManagedLedgerOffloaderTest.java' mvn_test -pl tiered-storage/jcloud -Dinclude='**/BlobStoreManagedLedgerOffloaderStreamingTest.java' + mvn_test -pl tiered-storage/jcloud -Dinclude='**/OffsetsCacheTest.java' echo "::endgroup::" local modules_with_quarantined_tests=$(git grep -l '@Test.*"quarantine"' | grep '/src/test/java/' | \ diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 4df6de54facd7..63e3e0a8d9abb 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -263,7 +263,7 @@ The Apache Software License, Version 2.0 * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar * Fastutil -- it.unimi.dsi-fastutil-8.5.16.jar - * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.51.0.jar + * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.59.2.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar * Gson - com.google.code.gson-gson-2.8.9.jar @@ -431,26 +431,26 @@ The Apache Software License, Version 2.0 - org.jetbrains.kotlin-kotlin-stdlib-jdk8-1.8.20.jar - org.jetbrains-annotations-13.0.jar * gRPC - - io.grpc-grpc-all-1.72.0.jar - - io.grpc-grpc-auth-1.72.0.jar - - io.grpc-grpc-context-1.72.0.jar - - io.grpc-grpc-core-1.72.0.jar - - io.grpc-grpc-protobuf-1.72.0.jar - - io.grpc-grpc-protobuf-lite-1.72.0.jar - - io.grpc-grpc-stub-1.72.0.jar - - io.grpc-grpc-alts-1.72.0.jar - - io.grpc-grpc-api-1.72.0.jar - - io.grpc-grpc-grpclb-1.72.0.jar - - io.grpc-grpc-netty-shaded-1.72.0.jar - - io.grpc-grpc-services-1.72.0.jar - - io.grpc-grpc-xds-1.72.0.jar - - io.grpc-grpc-rls-1.72.0.jar - - io.grpc-grpc-servlet-1.72.0.jar - - io.grpc-grpc-servlet-jakarta-1.72.0.jar + - io.grpc-grpc-all-1.75.0.jar + - io.grpc-grpc-auth-1.75.0.jar + - io.grpc-grpc-context-1.75.0.jar + - io.grpc-grpc-core-1.75.0.jar + - io.grpc-grpc-protobuf-1.75.0.jar + - io.grpc-grpc-protobuf-lite-1.75.0.jar + - io.grpc-grpc-stub-1.75.0.jar + - io.grpc-grpc-alts-1.75.0.jar + - io.grpc-grpc-api-1.75.0.jar + - io.grpc-grpc-grpclb-1.75.0.jar + - io.grpc-grpc-netty-shaded-1.75.0.jar + - io.grpc-grpc-services-1.75.0.jar + - io.grpc-grpc-xds-1.75.0.jar + - io.grpc-grpc-rls-1.75.0.jar + - io.grpc-grpc-servlet-1.75.0.jar + - io.grpc-grpc-servlet-jakarta-1.75.0.jar - io.grpc-grpc-util-1.60.0.jar - - io.grpc-grpc-opentelemetry-1.72.0.jar - - io.grpc-grpc-gcp-csm-observability-1.72.0.jar - - io.grpc-grpc-inprocess-1.72.0.jar + - io.grpc-grpc-opentelemetry-1.75.0.jar + - io.grpc-grpc-gcp-csm-observability-1.75.0.jar + - io.grpc-grpc-inprocess-1.75.0.jar * Perfmark - io.perfmark-perfmark-api-0.26.0.jar * OpenCensus @@ -484,8 +484,8 @@ The Apache Software License, Version 2.0 * Prometheus - io.prometheus-simpleclient_httpserver-0.16.0.jar * Oxia - - io.github.oxia-db-oxia-client-api-0.6.2.jar - - io.github.oxia-db-oxia-client-0.6.2.jar + - io.github.oxia-db-oxia-client-api-0.7.0.jar + - io.github.oxia-db-oxia-client-0.7.0.jar * OpenHFT - net.openhft-zero-allocation-hashing-0.16.jar * Java JSON WebTokens @@ -541,7 +541,7 @@ The Apache Software License, Version 2.0 - io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java8-1.33.6-alpha.jar - io.opentelemetry.semconv-opentelemetry-semconv-1.29.0-alpha.jar - com.google.cloud.opentelemetry-detector-resources-support-0.33.0.jar - - io.opentelemetry.contrib-opentelemetry-gcp-resources-1.43.0-alpha.jar + - io.opentelemetry.contrib-opentelemetry-gcp-resources-1.48.0-alpha.jar * Spotify completable-futures - com.spotify-completable-futures-0.3.6.jar * JSpecify diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d82786a4d4cea..0769a8a8b59ba 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2586,7 +2586,8 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) { public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { for (ManagedCursor cursor : cursors) { - Position lastAckedPosition = cursor.getMarkDeletedPosition(); + Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null + ? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition(); LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); @@ -3716,11 +3717,17 @@ public long getNumberOfEntries(Range range) { boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; if (fromPosition.getLedgerId() == toPosition.getLedgerId()) { - // If the 2 positions are in the same ledger - long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; - count += fromIncluded ? 1 : 0; - count += toIncluded ? 1 : 0; - return count; + LedgerInfo li = ledgers.get(toPosition.getLedgerId()); + if (li != null) { + // If the 2 positions are in the same ledger + long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; + count += fromIncluded ? 1 : 0; + count += toIncluded ? 1 : 0; + return count; + } else { + // if the ledgerId is not in the ledgers, it means it has been deleted + return 0; + } } else { long count = 0; // If the from & to are pointing to different ledgers, then we need to : diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 913fd6bfe8a79..a78821b646a0d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -42,6 +42,7 @@ import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -2667,6 +2668,36 @@ public void testGetNumberOfEntriesInStorage() throws Exception { assertEquals(length, numberOfEntries); } + @Test + public void testGetNumberOfEntries() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(5); + ManagedLedgerImpl managedLedger = + (ManagedLedgerImpl) factory.open("testGetNumberOfEntries", managedLedgerConfig); + // open cursor to prevent ledger to be deleted when ledger rollover + ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor"); + int numberOfEntries = 10; + List positions = new ArrayList<>(numberOfEntries); + for (int i = 0; i < numberOfEntries; i++) { + positions.add(managedLedger.addEntry(("entry-" + i).getBytes(Encoding))); + } + Position mdPos = positions.get(numberOfEntries - 1); + Position rdPos = PositionFactory.create(mdPos.getLedgerId(), mdPos.getEntryId() + 1); + managedCursor.delete(positions); + // trigger ledger rollover and wait for the new ledger created + Awaitility.await().untilAsserted(() -> { + assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString()); + }); + managedLedger.rollCurrentLedgerIfFull(); + Awaitility.await().untilAsserted(() -> { + assertEquals(managedLedger.getLedgersInfo().size(), 1); + assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened); + }); + + long length = managedLedger.getNumberOfEntries(Range.closed(mdPos, rdPos)); + assertEquals(length, 0); + } + @Test public void testEstimatedBacklogSize() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize"); @@ -4453,4 +4484,153 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) assertEquals(ml.currentLedgerEntries, 0); }); } + + /** + * Verifies that ledger trimming respects the persistent cursor position, not just the in-memory position. + * + *

Test Flow: + *

    + *
  1. Setup: Create 60 entries across multiple ledgers (10 entries per ledger) + *
  2. Initial Acks: Delete entries 0, 5-9 and wait for persistence + *
    • Persistent position: entry 0
    • In-memory position: entry 0
    + *
  3. Inject Delay: Add 30-second delay to BookKeeper writes (simulates slow ZK/BK) + *
  4. Delayed Acks: Asynchronously delete entries 1-4 + *
    • Persistent position: entry 0 (delayed)
    • In-memory position: entry 9
    + *
  5. Pre-Trim Sync: Call {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} + *
  6. Trigger Trim: Start ledger trimming process + *
  7. Verify: First ledger is preserved because persistent position (entry 0) still points to it + *
+ * + *

Success Criteria: + * The first ledger must NOT be deleted, preventing the cursor from pointing to a non-existent + * ledger after topic reload. This avoids negative backlog calculations. + * + *

What This Tests: + * Ensures that {@code maybeUpdateCursorBeforeTrimmingConsumedLedger()} correctly uses the + * persistent cursor position (not in-memory) when determining which ledgers are safe to trim. + */ + @Test + public void testCursorPointsToDeletedLedgerAfterTrim() throws Exception { + final String ledgerName = "testCursorPointsToDeletedLedgerAfterTrimAndReload"; + final String cursorName = "test-cursor"; + + // ===== SETUP: Create managed ledger with small ledgers ===== + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName); + + // ===== PHASE 1: Write entries to create multiple ledgers ===== + int totalEntries = 60; + log.info("=== PHASE 1: Writing {} entries to create multiple ledgers ===", totalEntries); + for (int i = 0; i < totalEntries; i++) { + Position pos = ledger.addEntry(("message-" + i).getBytes()); + log.info("Added entry: {}", pos); + } + + List ledgersAfterWrite = ledger.getLedgersInfoAsList(); + log.info("Created {} ledgers: {}", ledgersAfterWrite.size(), + ledgersAfterWrite.stream() + .map(l -> String.format("L%d(%d entries)", l.getLedgerId(), l.getEntries())) + .toArray()); + + assertTrue(ledgersAfterWrite.size() >= 5, "Should have at least 5 ledgers"); + long firstLedgerId = ledgersAfterWrite.get(0).getLedgerId(); + + // ===== PHASE 2: Initial acknowledgments (entries 0, 5-9) and wait for persistence ===== + log.info("=== PHASE 2: Acknowledging initial entries in first ledger {} ===", firstLedgerId); + List entries = cursor.readEntries(10); + + // Delete entries 5-9 first (out of order) + log.info("Deleting entries 5-9"); + for (int i = 5; i < 10; i++) { + cursor.delete(entries.get(i).getPosition()); + } + + // Delete entry 0, which advances mark-delete position + log.info("Deleting entry 0 - this advances mark-delete position"); + cursor.delete(entries.get(0).getPosition()); + + // Verify in-memory cursor position + Position initialMarkDelete = cursor.getMarkDeletedPosition(); + assertEquals(initialMarkDelete.getLedgerId(), firstLedgerId, + "Mark-delete should be in first ledger"); + assertEquals(initialMarkDelete.getEntryId(), entries.get(0).getEntryId(), + "Mark-delete should be at entry 0"); + + // Wait for this position to be persisted + log.info("Waiting for initial mark-delete position to persist: {}", initialMarkDelete); + Awaitility.await().untilAsserted(() -> { + assertEquals(cursor.getPersistentMarkDeletedPosition(), initialMarkDelete, + "Persistent position should catch up to in-memory position"); + }); + log.info("Initial position persisted successfully"); + + // ===== PHASE 3: Inject delay to simulate slow persistence ===== + long delay = 30; + log.info("=== PHASE 3: Injecting {}s delay for cursor persistence ===", + delay); + bkc.addEntryResponseDelay(delay, TimeUnit.SECONDS); + + // ===== PHASE 4: Asynchronously acknowledge entries 1-4 (persistence will be delayed) ===== + log.info("=== PHASE 4: Asynchronously acknowledging entries 1-4 (will be delayed) ==="); + for (int i = 1; i < 5; i++) { + final int index = i; + cursor.asyncDelete(entries.get(i).getPosition(), new AsyncCallbacks.DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + log.info("Entry {} deletion completed", index); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + log.error("Entry {} deletion failed", index, exception); + } + }, null); + } + + // Verify in-memory position has advanced to entry 9 + Position newMarkDelete = cursor.getMarkDeletedPosition(); + assertEquals(newMarkDelete.getLedgerId(), firstLedgerId, + "Mark-delete should still be in first ledger"); + assertEquals(newMarkDelete.getEntryId(), entries.get(9).getEntryId(), + "Mark-delete should have advanced to entry 9 (in-memory)"); + log.info("In-memory mark-delete position: {}", newMarkDelete); + + // ===== PHASE 5: Update cursor before trimming (important synchronization point) ===== + log.info("=== PHASE 5: Calling maybeUpdateCursorBeforeTrimmingConsumedLedger ==="); + ledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + + // ===== PHASE 6: Trigger ledger trimming ===== + log.info("=== PHASE 6: Triggering ledger trimming ==="); + CompletableFuture trimFuture = new CompletableFuture<>(); + ledger.trimConsumedLedgersInBackground(trimFuture); + trimFuture.get(); + log.info("Trimming completed"); + + // ===== VERIFICATION: Ledgers should NOT be trimmed ===== + log.info("=== VERIFICATION ==="); + + // Persistent position should still be at old position (entry 0) + Position persistentPosition = cursor.getPersistentMarkDeletedPosition(); + assertEquals(persistentPosition, initialMarkDelete, + "Persistent position should not have advanced (delayed)"); + log.info("Persistent mark-delete position (as expected): {}", persistentPosition); + log.info("In-memory mark-delete position: {}", newMarkDelete); + + // First ledger should still exist (not trimmed) + Awaitility.await().untilAsserted(() -> { + long firstRemainingLedger = ledger.getFirstPosition().getLedgerId(); + assertEquals(firstRemainingLedger, ledgersAfterWrite.get(0).getLedgerId(), + "First ledger should NOT be trimmed because persistent cursor position " + + "is still pointing to it (entry 0)"); + }); + log.info("SUCCESS: First ledger {} was correctly preserved", firstLedgerId); + + // ===== CLEANUP ===== + entries.forEach(Entry::release); + cursor.close(); + ledger.close(); + } } diff --git a/pip/pip-344.md b/pip/pip-344.md index 5eafc6fd5c279..120a8e1ef0d4a 100644 --- a/pip/pip-344.md +++ b/pip/pip-344.md @@ -122,6 +122,9 @@ message FeatureFlags { # Backward & Forward Compatibility -- Old version client and New version Broker: The client will call the old API. +Old version (`< 3.0.6`) client and New version (`>= 3.0.6`) Broker: The client will call the old API. -- New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false. +New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false in the following cases: +- The topic is a DLQ topic +- The topic is non-persistent +- The topic is in geo-replication that the local cluster is new version and the remote cluster is old version diff --git a/pom.xml b/pom.xml index 4ca0a5f2ab274..6317382aedaa1 100644 --- a/pom.xml +++ b/pom.xml @@ -216,7 +216,7 @@ flexible messaging model and an intuitive client API. 1.17 3.25.5 ${protobuf3.version} - 1.72.0 + 1.75.0 1.41.0 0.26.0 ${grpc.version} @@ -294,7 +294,7 @@ flexible messaging model and an intuitive client API. 4.5.13 4.4.15 0.7.7 - 0.6.2 + 0.7.0 2.0 1.10.12 5.5.0 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java index 25a0a2752d11b..4af86c1d71de8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java @@ -59,11 +59,11 @@ public void run() { if (isCancel) { return; } - if (factory instanceof ManagedLedgerFactoryImpl - && !((ManagedLedgerFactoryImpl) factory).isMetadataServiceAvailable()) { - return; - } try { + if (factory instanceof ManagedLedgerFactoryImpl + && !((ManagedLedgerFactoryImpl) factory).isMetadataServiceAvailable()) { + return; + } loadManager.get().doLoadShedding(); } catch (Exception e) { LOG.warn("Error during the load shedding", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 8247e566606c7..8e992095c5222 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -460,7 +460,32 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { - + // When the channel is disabled/closed, do not perform liveness verification, return according to the status: + if (channelState == Disabled || channelState == Closed) { + switch (state) { + // Owned/Splitting: Directly return owner (for isOwner judgment as true) + case Owned: + case Splitting: + return CompletableFuture.completedFuture(owner); + case Assigning: + case Releasing: + if (owner.isPresent()) { + if (isTargetBroker(owner.get())) { + // This machine is the target taker, + // return an unfinished future with "waiting for ownership" + return dedupeGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable); + } else { + // The target is another broker, return directly so that the upper layer can redirect + return CompletableFuture.completedFuture(owner); + } + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + // Other status: return empty + default: + return CompletableFuture.completedFuture(Optional.empty()); + } + } // If this broker's registry does not exist(possibly suffering from connecting to the metadata store), // we return the owner without its activeness check. // This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a580c7500b7d0..b5dbddc2ea377 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -43,9 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1225,35 +1223,6 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName())); } - /** - * @deprecated This method is only used in test now. - */ - @Deprecated - public boolean isServiceUnitActive(TopicName topicName) { - try { - return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig() - .getMetadataStoreOperationTimeoutSeconds(), SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e); - throw new RuntimeException(e); - } - } - - public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) { - // TODO: Add unit tests cover it. - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return getBundleAsync(topicName) - .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); - } - return getBundleAsync(topicName).thenCompose(bundle -> { - Optional> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); - if (optionalFuture.isEmpty()) { - return CompletableFuture.completedFuture(false); - } - return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); - }); - } - private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8fd18b29991bd..0a8add24a4d32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -80,7 +80,6 @@ import java.util.function.Consumer; import java.util.function.Predicate; import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -96,6 +95,7 @@ import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; @@ -195,6 +195,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1042,7 +1043,7 @@ public CompletableFuture> getTopic(final String topic, boolean c } public CompletableFuture> getTopic(final String topic, boolean createIfMissing, - Map properties) { + @Nullable Map properties) { return getTopic(TopicName.get(topic), createIfMissing, properties); } @@ -1062,7 +1063,7 @@ public CompletableFuture> getTopic(final String topic, boolean c * @return CompletableFuture with an Optional of the topic if found or created, otherwise empty. */ public CompletableFuture> getTopic(final TopicName topicName, boolean createIfMissing, - Map properties) { + @Nullable Map properties) { try { // If topic future exists in the cache returned directly regardless of whether it fails or timeout. CompletableFuture> tp = topics.get(topicName.toString()); @@ -1078,22 +1079,32 @@ public CompletableFuture> getTopic(final TopicName topicName, bo return FutureUtil.failedFuture(new NotAllowedException( "Broker is unable to load persistent topic")); } - return checkNonPartitionedTopicExists(topicName).thenCompose(exists -> { + final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( + Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), + () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); + final var context = new TopicLoadingContext(topicName, createIfMissing, topicFuture); + if (properties != null) { + context.setProperties(properties); + } + topicFuture.exceptionally(t -> { + final var now = System.nanoTime(); + if (FutureUtil.unwrapCompletionException(t) instanceof TimeoutException) { + log.warn("Failed to load {} after {} ms", topicName, context.latencyMs(now)); + } else { + log.warn("Failed to load {} after {} ms", topicName, context.latencyString(now), t); + } + pulsarStats.recordTopicLoadFailed(); + return Optional.empty(); + }); + checkNonPartitionedTopicExists(topicName).thenAccept(exists -> { if (!exists && !createIfMissing) { - return CompletableFuture.completedFuture(Optional.empty()); + topicFuture.complete(Optional.empty()); + return; } // The topic level policies are not needed now, but the meaning of calling // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. - return getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) - .exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, - rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); + getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) + .thenCompose(optionalTopicPolicies -> { if (topicName.isPartitioned()) { final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); return fetchPartitionedTopicMetadataAsync(topicNameEntity) @@ -1103,8 +1114,7 @@ public CompletableFuture> getTopic(final TopicName topicName, bo if (metadata.partitions == 0 || topicName.getPartitionIndex() < metadata.partitions) { return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(tpName, - createIfMissing, properties)); + loadOrCreatePersistentTopic(context)); } else { final String errorMsg = String.format("Illegal topic partition name %s with max allowed " @@ -1116,10 +1126,44 @@ public CompletableFuture> getTopic(final TopicName topicName, bo }); } else { return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(tpName, createIfMissing, properties)); + loadOrCreatePersistentTopic(context)); + } + }).thenRun(() -> { + final var inserted = new MutableBoolean(false); + final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> { + inserted.setTrue(); + return loadOrCreatePersistentTopic(context); + }); + if (inserted.isFalse()) { + // This case should happen rarely when the same topic is loaded concurrently because we + // checked if the `topics` cache includes this topic before, so the latency is not the + // actual loading latency that should not be recorded in metrics. + log.info("[{}] Finished loading from other concurrent loading task (latency: {})", + topicName, context.latencyString(System.nanoTime())); + cachedFuture.whenComplete((optTopic, e) -> { + if (e == null) { + topicFuture.complete(optTopic); + } else { + topicFuture.completeExceptionally(e); + } + }); } + }).exceptionally(e -> { + pulsar.getExecutor().execute(() -> topics.remove(topicName.toString(), topicFuture)); + final Throwable rc = FutureUtil.unwrapCompletionException(e); + final String errorInfo = String.format("Topic creation encountered an exception by initialize" + + " topic policies service. topic_name=%s error_message=%s", topicName, + rc.getMessage()); + log.error(errorInfo, rc); + topicFuture.completeExceptionally(rc); + return null; }); + }).exceptionally(e -> { + pulsar.getExecutor().execute(() -> topics.remove(topicName.toString(), topicFuture)); + topicFuture.completeExceptionally(e.getCause()); + return null; }); + return topicFuture; } else { if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -1619,29 +1663,16 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c /** * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic * loading and puts them into queue once in-process topics are created. - * - * @param topic persistent-topic name - * @return CompletableFuture - * @throws RuntimeException */ - protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, - boolean createIfMissing, Map properties) { - final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( - Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), - () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); - - topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(); - return null; - }); - + protected CompletableFuture> loadOrCreatePersistentTopic(TopicLoadingContext context) { + final var topic = context.getTopicName().toString(); + final var topicFuture = context.getTopicFuture(); checkTopicNsOwnership(topic) .thenRun(() -> { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, - properties); + checkOwnershipAndCreatePersistentTopic(context); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1656,8 +1687,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return null; }); } else { - pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, - createIfMissing, topicFuture, properties)); + pendingTopicLoadingQueue.add(context); if (log.isDebugEnabled()) { log.debug("topic-loading for {} added into pending queue", topic); } @@ -1700,50 +1730,41 @@ protected CompletableFuture> fetchTopicPropertiesAsync(Topic } } - private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, - CompletableFuture> topicFuture, - Map properties) { - TopicName topicName = TopicName.get(topic); - pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) - .thenAccept(isActive -> { - if (isActive) { - CompletableFuture> propertiesFuture; - if (properties == null) { - //Read properties from storage when loading topic. - propertiesFuture = fetchTopicPropertiesAsync(topicName); - } else { - propertiesFuture = CompletableFuture.completedFuture(properties); - } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic0(topic, createIfMissing, topicFuture, - finalProperties) - ).exceptionally(throwable -> { - log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); - return null; - }); - } else { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - }).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); - return null; - }); + private void checkOwnershipAndCreatePersistentTopic(TopicLoadingContext context) { + TopicName topicName = context.getTopicName(); + final var topic = topicName.toString(); + final var topicFuture = context.getTopicFuture(); + checkTopicNsOwnership(topic).thenRun(() -> { + CompletableFuture> propertiesFuture; + if (context.getProperties() == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName); + } else { + propertiesFuture = CompletableFuture.completedFuture(context.getProperties()); + } + propertiesFuture.thenAccept(finalProperties -> { + context.setProperties(finalProperties); + //TODO add topicName in properties? + createPersistentTopic0(context); + }).exceptionally(throwable -> { + log.warn("[{}] Read topic property failed", topic, throwable); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(throwable); + return null; + }); + }).exceptionally(e -> { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(e.getCause()); + return null; + }); } @VisibleForTesting - public void createPersistentTopic0(final String topic, boolean createIfMissing, - CompletableFuture> topicFuture, - Map properties) { - TopicName topicName = TopicName.get(topic); - final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + public void createPersistentTopic0(TopicLoadingContext context) { + TopicName topicName = context.getTopicName(); + final var topic = topicName.toString(); + final var topicFuture = context.getTopicFuture(); + final var createIfMissing = context.isCreateIfMissing(); if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); @@ -1777,7 +1798,9 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors)); } managedLedgerConfig.setCreateIfMissing(createIfMissing); - managedLedgerConfig.setProperties(properties); + if (context.getProperties() != null) { + managedLedgerConfig.setProperties(context.getProperties()); + } String shadowSource = managedLedgerConfig.getShadowSource(); if (shadowSource != null) { managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding()); @@ -1822,10 +1845,11 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { return persistentTopic.checkDeduplicationStatus(); }) .thenRun(() -> { - log.info("Created topic {} - dedup is {}", topic, - persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"); - long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - - topicCreateTimeMs; + long nowInNanos = System.nanoTime(); + long topicLoadLatencyMs = context.latencyMs(nowInNanos); + log.info("Created topic {} - dedup is {} (latency: {})", topic, + persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled", + context.latencyString(nowInNanos)); pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); if (!topicFuture.complete(Optional.of(persistentTopic))) { // Check create persistent topic timeout. @@ -2705,8 +2729,8 @@ private void unloadDeletedReplNamespace(Policies data, NamespaceName namespace) pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(), bundle.getBundleRange()); } catch (Exception e) { - log.error("Failed to unload namespace-bundle {}-{} that not owned by {}, {}", - namespace.toString(), bundle.toString(), localCluster, e.getMessage()); + log.error("Failed to unload namespace-bundle {} that not owned by {}, {}", + bundle.toString(), localCluster, e.getMessage()); } }); } @@ -3220,15 +3244,13 @@ private void createPendingLoadTopic() { return; } - final String topic = pendingTopic.getTopic(); + pendingTopic.polledFromQueue(); + final String topic = pendingTopic.getTopicName().toString(); checkTopicNsOwnership(topic).thenRun(() -> { CompletableFuture> pendingFuture = pendingTopic.getTopicFuture(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - checkOwnershipAndCreatePersistentTopic(topic, - pendingTopic.isCreateIfMissing(), - pendingFuture, - pendingTopic.getProperties()); + checkOwnershipAndCreatePersistentTopic(pendingTopic); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { @@ -3796,13 +3818,4 @@ private TopicFactory createPersistentTopicFactory() throws Exception { public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { this.pulsarChannelInitFactory = factory; } - - @AllArgsConstructor - @Getter - private static class TopicLoadingContext { - private final String topic; - private final boolean createIfMissing; - private final CompletableFuture> topicFuture; - private final Map properties; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 67b4b08234383..9bb851b24307c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -508,8 +508,12 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName, } @Override - protected void handleLookup(CommandLookupTopic lookup) { + protected void handleLookup(CommandLookupTopic lookupParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandLookupTopic lookup = new CommandLookupTopic().copyFrom(lookupParam); + final long requestId = lookup.getRequestId(); final boolean authoritative = lookup.isAuthoritative(); @@ -596,8 +600,13 @@ private void writeAndFlush(ByteBuf cmd) { } @Override - protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { + protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadataParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandPartitionedTopicMetadata partitionMetadata = + new CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam); + final long requestId = partitionMetadata.getRequestId(); if (log.isDebugEnabled()) { log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), @@ -3165,8 +3174,12 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { } @Override - protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) { + protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicListParam) { checkArgument(state == State.Connected); + + // make a copy since command is handled asynchronously + CommandWatchTopicList commandWatchTopicList = new CommandWatchTopicList().copyFrom(commandWatchTopicListParam); + final long requestId = commandWatchTopicList.getRequestId(); final long watcherId = commandWatchTopicList.getWatcherId(); final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java new file mode 100644 index 0000000000000..9e3ed230cd26b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import org.apache.pulsar.common.naming.TopicName; +import org.jspecify.annotations.Nullable; + +@RequiredArgsConstructor +public class TopicLoadingContext { + + private static final String EXAMPLE_LATENCY_OUTPUTS = "1234 ms (queued: 567)"; + + private final long startNs = System.nanoTime(); + @Getter + private final TopicName topicName; + @Getter + private final boolean createIfMissing; + @Getter + private final CompletableFuture> topicFuture; + @Getter + @Setter + @Nullable private Map properties; + private long polledFromQueueNs = -1L; + + public void polledFromQueue() { + polledFromQueueNs = System.nanoTime(); + } + + public long latencyMs(long nowInNanos) { + return TimeUnit.NANOSECONDS.toMillis(nowInNanos - startNs); + } + + public String latencyString(long nowInNanos) { + final var builder = new StringBuilder(EXAMPLE_LATENCY_OUTPUTS.length()); + builder.append(latencyMs(nowInNanos)); + builder.append(" ms"); + if (polledFromQueueNs >= 0) { + builder.append(" (queued: ").append(latencyMs(polledFromQueueNs)).append(")"); + } + return builder.toString(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 2f79b972b367c..3f10553902a5c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -521,12 +522,14 @@ public void testMetadataServiceNotAvailable() { AtomicReference atomicLoadManager = new AtomicReference<>(loadManager); ManagedLedgerFactoryImpl factory = mock(ManagedLedgerFactoryImpl.class); doReturn(false).when(factory).isMetadataServiceAvailable(); - LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null, factory); + LoadSheddingTask task2 = spy(new LoadSheddingTask(atomicLoadManager, null, null, factory)); task2.run(); verify(loadManager, times(0)).doLoadShedding(); + verify(task2, times(1)).start(); doReturn(true).when(factory).isMetadataServiceAvailable(); task2.run(); verify(loadManager, times(1)).doLoadShedding(); + verify(task2, times(2)).start(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 49f5d7c5c36ce..56a08eac2096f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -125,6 +125,7 @@ import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -148,7 +149,8 @@ public class BrokerServiceTest extends BrokerTestBase { @Override protected void setup() throws Exception { conf.setSystemTopicEnabled(false); - conf.setTopicLevelPoliciesEnabled(false); + conf.setTopicLevelPoliciesEnabled(true); + conf.setTopicPoliciesServiceClassName(MockTopicPoliciesService.class.getName()); super.baseSetup(); } @@ -1184,7 +1186,8 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true, null); + .loadOrCreatePersistentTopic(new TopicLoadingContext(topic, true, + new CompletableFuture<>())); try { futureResult.get(); @@ -1227,8 +1230,8 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex ArrayList>> loadFutures = new ArrayList<>(); for (int i = 0; i < 10; i++) { // try to create topic which should fail as bundle is disable - CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName + "_" + i, false, null); + CompletableFuture> futureResult = pulsar.getBrokerService().loadOrCreatePersistentTopic( + new TopicLoadingContext(TopicName.get(topicName + "_" + i), false, new CompletableFuture<>())); loadFutures.add(futureResult); } @@ -2035,5 +2038,28 @@ public void testPulsarMetadataEventSyncProducerCreation() throws Exception { retryStrategically((test) -> sync.getProducer() != null, 1000, 10); assertNotNull(sync.getProducer()); } + + @Test + public void testGetTopicWhenTopicPoliciesFail() throws Exception { + final var topicName = TopicName.get("prop/ns-abc/test-get-topic-when-topic-policies-fail"); + MockTopicPoliciesService.FAILED_TOPICS.add(topicName); + @Cleanup final var producer = pulsarClient.newProducer().topic(topicName.toString()).create(); + assertFalse(MockTopicPoliciesService.FAILED_TOPICS.contains(topicName)); + } + + static class MockTopicPoliciesService extends TopicPoliciesService.TopicPoliciesServiceDisabled { + + static final Set FAILED_TOPICS = ConcurrentHashMap.newKeySet(); + + @Override + public CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type) { + if (FAILED_TOPICS.contains(topicName)) { + // Only fail once + FAILED_TOPICS.remove(topicName); + return CompletableFuture.failedFuture(new RuntimeException("injected failure for " + topicName)); + } + return CompletableFuture.completedFuture(Optional.empty()); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 96ca2d90f0613..37cf75d84ca6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -79,7 +79,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -162,7 +161,6 @@ public void setup() throws Exception { NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 20f58f277a39c..2f8a924635116 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -51,7 +51,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +102,6 @@ public void setup(Method m) throws Exception { NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 42defbe293f98..eaec93f78a162 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -225,8 +225,6 @@ public void setup() throws Exception { NamespaceBundle bundle = mock(NamespaceBundle.class); doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any()); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); - doReturn(true).when(nsSvc).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4d734081e43cd..2cfbac35bfcfe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -231,8 +231,6 @@ public void setup() throws Exception { .getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any()); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); - doReturn(true).when(namespaceService).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( @@ -1601,8 +1599,8 @@ public void testProducerOnNotOwnedTopic() throws Exception { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) - .isServiceUnitActiveAsync(any(TopicName.class)); + doReturn(CompletableFuture.failedFuture(new ServiceUnitNotReadyException("failed"))).when(brokerService) + .checkTopicNsOwnership(any(String.class)); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index ffdb8610d58eb..ec16cc7d9ebc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import lombok.Cleanup; @@ -1046,11 +1047,14 @@ public void testSeekWillNotEncounteredFencedError() throws Exception { // Create a pulsar client with a subscription fenced counter. ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); + // Count switch: Default off, turn on again before seek starts. + final AtomicBoolean countAfterSeek = new AtomicBoolean(false); @Cleanup PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { protected void handleError(CommandError error) { - if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) { + if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced") + && countAfterSeek.get()) { receivedFencedErrorCounter.incrementAndGet(); } super.handleError(error); @@ -1087,10 +1091,9 @@ protected void handleError(CommandError error) { assertNotNull(msg); consumer.acknowledge(msg); } + countAfterSeek.set(true); consumer.seek(msgId1); - Awaitility.await().untilAsserted(() -> { - assertTrue(consumer.isConnected()); - }); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); assertEquals(receivedFencedErrorCounter.get(), 0); // cleanup. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 93654db2c9992..5a54b37a637dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -28,7 +28,6 @@ import io.netty.buffer.Unpooled; import io.opentelemetry.api.common.Attributes; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -47,6 +46,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicLoadingContext; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; @@ -178,7 +178,8 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), Mockito.eq(PersistentTopic.class)); - brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + brokerService.createPersistentTopic0(new TopicLoadingContext(TopicName.get(topic), true, + new CompletableFuture<>())); Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index b7c323af5bcd4..3613ba516254c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -245,7 +245,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { admin.topics().createNonPartitionedTopic(tpName); admin.namespaces().unload(ns); - // Inject an error when calling "NamespaceService.isServiceUnitActiveAsync". + // Inject an error when loading the topic AtomicInteger failedTimes = new AtomicInteger(); NamespaceService namespaceService = pulsar.getNamespaceService(); doAnswer(invocation -> { @@ -258,7 +258,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { return CompletableFuture.failedFuture(new RuntimeException("mocked error")); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer can create successfully eventually. Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); @@ -295,7 +295,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer create failed due to pulsar does not allow to create topic automatically. try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 3b30f0011e5c8..6e9bc4595a030 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -260,7 +260,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep field.set(clientCnxFuture.get(), false); } try { - clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); + clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false, false).join(); Assert.fail("Expected an error that the broker version is too old."); } catch (Exception ex) { Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported by the broker")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java index 896bf32ad3798..2e73080b60d7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java @@ -23,6 +23,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.opentelemetry.api.OpenTelemetry; import java.util.Collections; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; @@ -56,6 +57,13 @@ public void testCheckSequenceId() throws Exception { Consumer consumer = client.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub") .subscribe(); + // Create a producer + Producer producer = client.newProducer(Schema.STRING).topic(topicName).create(); + // Move the fence timing to after the first message is successfully written + // The current ledger is not empty, the Broker recovery will not take the abnormal path of + // "deleting empty ledger + unable to find old ledger" + producer.send("Hello-0"); + // Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the // broker to fail subsequent send operation and it will trigger a recover EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); @@ -67,15 +75,12 @@ public void testCheckSequenceId() throws Exception { ml.close(); clientFactory.close(); - // Create a producer - Producer producer = client.newProducer(Schema.STRING).topic(topicName).create(); - - for (int i = 0; i < num; i++) { + for (int i = 1; i < num; i++) { producer.send("Hello-" + i); } for (int i = 0; i < num; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(10, TimeUnit.SECONDS); assertEquals(msg.getValue(), "Hello-" + i); assertEquals(msg.getSequenceId(), i); consumer.acknowledge(msg); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 45e46304b54a4..64dd527468a16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2297,7 +2297,7 @@ public void testAcknowledgeWithReconnection() throws Exception { Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(), - 5)); + 0)); // Make consumer reconnect to broker admin.topics().unload(topicName); diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index 964381d8fe653..9851a2cdc55d4 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -499,6 +499,17 @@ + + org.apache.maven.plugins + maven-source-plugin + + + + attach-sources + none + + + org.apache.maven.plugins maven-enforcer-plugin diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index cf88772a25f7c..efee2fbe06f6a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1234,7 +1234,7 @@ protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess c CompletableFuture requestFuture = (CompletableFuture) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(commandWatchTopicListSuccess); + requestFuture.complete(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess)); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java index 2f73a6af40666..65aeac40ce7ce 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java @@ -35,6 +35,7 @@ public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsReco private final DoubleAdder sendMsgsRateAggregate; private final DoubleAdder sendBytesRateAggregate; private int partitions = 0; + private int pendingQueueSize; public PartitionedTopicProducerStatsRecorderImpl() { super(); @@ -46,6 +47,7 @@ public PartitionedTopicProducerStatsRecorderImpl() { void reset() { super.reset(); partitions = 0; + pendingQueueSize = 0; } void updateCumulativeStats(String partition, ProducerStats stats) { @@ -58,6 +60,7 @@ void updateCumulativeStats(String partition, ProducerStats stats) { sendMsgsRateAggregate.add(stats.getSendMsgsRate()); sendBytesRateAggregate.add(stats.getSendBytesRate()); partitions++; + pendingQueueSize += stats.getPendingQueueSize(); } @Override @@ -75,5 +78,10 @@ public Map getPartitionStats() { return partitionStats; } + @Override + public int getPendingQueueSize() { + return pendingQueueSize; + } + private static final Logger log = LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java index d828d729cc708..5e8c746ed8948 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java @@ -332,7 +332,7 @@ public double getSendLatencyMillisMax() { @Override public int getPendingQueueSize() { - return producer.getPendingQueueSize(); + return producer != null ? producer.getPendingQueueSize() : 0; } public void cancelStatsTimeout() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 1698322dc341f..012d285529a04 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1250,7 +1250,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, @Override public CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) { - return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, false).thenApply(metadata -> { + return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, true).thenApply(metadata -> { if (metadata.partitions > 0) { TopicName topicName = TopicName.get(topic); List partitions = new ArrayList<>(metadata.partitions); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index e0721ffe90597..c0a75b09ccea1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -334,7 +335,9 @@ public void testCreateWatcher() { .setRequestId(7) .setWatcherId(5).setTopicsHash("f00"); cnx.handleCommandWatchTopicListSuccess(success); - assertEquals(result.getNow(null), success); + assertThat(result.getNow(null)) + .usingRecursiveComparison() + .comparingOnlyFields("requestId", "watcherId", "topicsHash"); }); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java index 8f648bfd9ffbc..1981c2c8ff503 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -93,4 +93,29 @@ public void testPartitionTopicAggegationStats() { assertTrue(recorder2.getSendBytesRate() > 0); assertTrue(recorder2.getSendMsgsRate() > 0); } + + @Test + public void testPartitionedTopicProducerStatsPendingQueueSizeDoesntNPE() { + PartitionedTopicProducerStatsRecorderImpl recorder = new PartitionedTopicProducerStatsRecorderImpl(); + assertEquals(recorder.getPendingQueueSize(), 0); + } + + @Test + public void testProducerStatsPendingQueueSizeDoesntNPE() { + ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(); + assertEquals(recorder.getPendingQueueSize(), 0); + } + + @Test + public void testPartitionedTopicProducerStatsPendingQueueSizeAggregated() { + PartitionedTopicProducerStatsRecorderImpl recorder = new PartitionedTopicProducerStatsRecorderImpl(); + + ProducerStatsRecorderImpl individualStats = spy(new ProducerStatsRecorderImpl()); + when(individualStats.getPendingQueueSize()).thenReturn(1); + recorder.updateCumulativeStats("1", individualStats); + recorder.updateCumulativeStats("2", individualStats); + recorder.updateCumulativeStats("3", individualStats); + + assertEquals(recorder.getPendingQueueSize(), 3); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index c05b1d796dfdd..41658e62f1b78 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -483,6 +483,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } finally { buffer.release(); + // Clear the fields in cmd to release memory. + // The clear() call below also helps prevent misuse of holding references to command objects after + // handle* methods complete, as per the class javadoc requirement. + // While this doesn't completely prevent such misuse, it makes tests more likely to catch violations. + cmd.clear(); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index 7f0dc6fba10f3..d055dd7da55fb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -20,14 +20,14 @@ import io.opentelemetry.api.OpenTelemetry; import io.oxia.client.api.AsyncOxiaClient; -import io.oxia.client.api.DeleteOption; import io.oxia.client.api.Notification; import io.oxia.client.api.OxiaClientBuilder; -import io.oxia.client.api.PutOption; import io.oxia.client.api.PutResult; import io.oxia.client.api.Version; import io.oxia.client.api.exceptions.KeyAlreadyExistsException; import io.oxia.client.api.exceptions.UnexpectedVersionIdException; +import io.oxia.client.api.options.DeleteOption; +import io.oxia.client.api.options.PutOption; import java.time.Duration; import java.util.Collections; import java.util.EnumSet; @@ -133,7 +133,7 @@ Optional convertGetResult( return Optional.of(result) .map( oxiaResult -> - new GetResult(oxiaResult.getValue(), convertStat(path, oxiaResult.getVersion()))); + new GetResult(oxiaResult.value(), convertStat(path, oxiaResult.version()))); } Stat convertStat(String path, Version version) { @@ -149,7 +149,7 @@ Stat convertStat(String path, Version version) { @Override public CompletableFuture> getChildrenFromStore(String path) { - var pathWithSlash = path + "/"; + var pathWithSlash = path.endsWith("/") ? path : path + "/"; return client .list(pathWithSlash, pathWithSlash + "/") diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 2af6760cbc4ad..9bd2ddd5e8f22 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -667,27 +667,15 @@ public void testGetChildren(String provider, Supplier urlSupplier) throw store.put("/a/a-2", "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); store.put("/b/c/b/1", "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); - List subPaths = store.getChildren("/").get(); - Set ignoredRootPaths = Set.of("zookeeper"); - Set expectedSet = Set.of("a", "b"); - for (String subPath : subPaths) { - if (ignoredRootPaths.contains(subPath)) { - continue; - } - assertThat(expectedSet).contains(subPath); - } + List subPaths = new ArrayList<>(store.getChildren("/").get()); + subPaths.remove("zookeeper"); // ignored + assertThat(subPaths).containsExactlyInAnyOrderElementsOf(Set.of("a", "b")); List subPaths2 = store.getChildren("/a").get(); - Set expectedSet2 = Set.of("a-1", "a-2"); - for (String subPath : subPaths2) { - assertThat(expectedSet2).contains(subPath); - } + assertThat(subPaths2).containsExactlyInAnyOrderElementsOf(Set.of("a-1", "a-2")); List subPaths3 = store.getChildren("/b").get(); - Set expectedSet3 = Set.of("c"); - for (String subPath : subPaths3) { - assertThat(expectedSet3).contains(subPath); - } + assertThat(subPaths3).containsExactlyInAnyOrderElementsOf(Set.of("c")); } @Test(dataProvider = "impl") diff --git a/src/settings.xml b/src/settings.xml index 80ec2f40620e7..b27c935634037 100644 --- a/src/settings.xml +++ b/src/settings.xml @@ -26,7 +26,7 @@ ${env.APACHE_USER} ${env.APACHE_PASSWORD} - + apache.releases.https @@ -34,17 +34,4 @@ ${env.APACHE_PASSWORD} - - - - apache - - true - - - ${env.GPG_EXECUTABLE} - ${env.GPG_PASSPHRASE} - - - diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java index 2e153b4ec7fdc..9fcbf66765931 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java @@ -19,8 +19,19 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.tests.integration.topologies.ClientTestBase; +import org.testng.Assert; import org.testng.annotations.Test; public class ClientTest25 extends PulsarStandaloneTestSuite25 { @@ -34,4 +45,47 @@ public void testResetCursorCompatibility(Supplier serviceUrl, Supplier msg, TopicMetadata metadata) { + return metadata.numPartitions() - 1; + } + }) + .topic(topic) + .create(); + @Cleanup final var consumer = pulsarClient.newConsumer().autoUpdatePartitions(true) + .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS) + .topic(topic).subscriptionName("sub") + .subscribe(); + @Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer().autoUpdatePartitions(true) + .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS) + .topics(List.of(topic, topic2)).subscriptionName("sub-2").subscribe(); + + admin.topics().updatePartitionedTopic(topic, 3); + Thread.sleep(1500); + final var msgId = (MessageIdAdv) producer.send("msg".getBytes()); + Assert.assertEquals(msgId.getPartitionIndex(), 2); + + final var msg = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2); + final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(msg2); + Assert.assertEquals(msg2.getMessageId(), msg.getMessageId()); + } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java index 083706ceb391f..5eb11edf958f0 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java @@ -38,7 +38,7 @@ public void testCache() throws Exception { // test ttl offsetsCache.put(1, 2, 2); assertEquals(offsetsCache.getIfPresent(1, 2), 2); - Thread.sleep(1500); + Thread.sleep(2000); assertNull(offsetsCache.getIfPresent(1, 2)); offsetsCache.close(); }