Skip to content

[fix][broker] Guard BucketDelayedDeliveryTracker.nextDeliveryTime against empty queues#26080

Open
dao-jun wants to merge 1 commit into
apache:masterfrom
dao-jun:fix/nextDeliveryTime_IAE
Open

[fix][broker] Guard BucketDelayedDeliveryTracker.nextDeliveryTime against empty queues#26080
dao-jun wants to merge 1 commit into
apache:masterfrom
dao-jun:fix/nextDeliveryTime_IAE

Conversation

@dao-jun

@dao-jun dao-jun commented Jun 23, 2026

Copy link
Copy Markdown
Member

Motivation

The orphaned-message filter introduced in #25984 can leave BucketDelayedDeliveryTracker in a state where lastMutableBucket and sharedBucketPriorityQueue are both empty while numberDelayedMessages is still positive — the remaining messages live in not-yet-loaded snapshot segments of orphaned immutable buckets whose ledger
range falls entirely below the cursor's mark-deleted position.

In that state, nextDeliveryTime() falls through both existing if branches and unconditionally calls lastMutableBucket.nextDeliveryTime() → MutableBucket.nextDeliveryTime() → TripleLongPriorityQueue.peekN1(), which throws IllegalArgumentException("...") on the empty queue:

java.lang.IllegalArgumentException
at TripleLongPriorityQueue.peekN1(TripleLongPriorityQueue.java:126)
at MutableBucket.nextDeliveryTime(MutableBucket.java:222)
at BucketDelayedDeliveryTracker.nextDeliveryTime(BucketDelayedDeliveryTracker.java:621)
at AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:127)
at BucketDelayedDeliveryTracker.getScheduledMessages(BucketDelayedDeliveryTracker.java:754)

The exception propagates out of getScheduledMessages() / hasMessageAvailable() and surfaces on whatever broker thread polled the tracker (e.g., a dispatcher read path). The fall-through pattern has been there since PIP-195 (#17611, 2022); the trim filter made the both-empty-but-count-positive state reachable, so this is a
regression window introduced by #25984.

Modifications

  • Added a third branch to BucketDelayedDeliveryTracker.nextDeliveryTime() that returns Long.MAX_VALUE when both lastMutableBucket and sharedBucketPriorityQueue are empty. This signals "no imminent delivery" symmetrically with how the parent AbstractDelayedDeliveryTracker.updateTimer() already treats
    numberOfDelayedMessages == 0, and is interpreted correctly by both call sites:
    • hasMessageAvailable(): Long.MAX_VALUE <= cutoffTime is false → returns false.
    • updateTimer(): Long.MAX_VALUE - now is huge → schedules the timer far out; new addMessage() calls and a successful asyncTrimImmutableBuckets() will reschedule it. Netty's HashedWheelTimer clamps the nanos conversion at Long.MAX_VALUE (no overflow).
  • Added regression test BucketDelayedDeliveryTrackerTest.testGetScheduledMessagesWhenAllOrphaned. It seals 5 orphaned messages into a single immutable bucket (5 snapshot segments, only the first one loaded into the shared queue), flushes the mutable bucket into the shared queue by advancing the clock, then calls
    getScheduledMessages() and hasMessageAvailable(). Without the fix it fails with the IAE above; with the fix it asserts:
    • No exception is thrown.
    • scheduledMessages is empty (orphaned messages are filtered, not delivered).
    • numberDelayedMessages is still positive (storage-only messages keep the counter up).
    • hasMessageAvailable() returns false.

Scope note

This is a defensive fix for the crash only. The underlying inconsistency — numberDelayedMessages > 0 while no in-memory message exists — resolves itself once asyncTrimImmutableBuckets() deletes the orphaned bucket and decrements the counter, which happens as soon as immutableBuckets.size() > maxNumBuckets triggers
trim+merge again. A more thorough cleanup (e.g., probing the next snapshot segment inside the filter loop, or running trim on a schedule rather than only on bucket overflow) is intentionally out of scope for this PR and can be tracked separately if it turns into a real production concern.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants