[fix][broker] Guard BucketDelayedDeliveryTracker.nextDeliveryTime against empty queues#26080
Open
dao-jun wants to merge 1 commit into
Open
[fix][broker] Guard BucketDelayedDeliveryTracker.nextDeliveryTime against empty queues#26080dao-jun wants to merge 1 commit into
dao-jun wants to merge 1 commit into
Conversation
11 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
The orphaned-message filter introduced in #25984 can leave BucketDelayedDeliveryTracker in a state where lastMutableBucket and sharedBucketPriorityQueue are both empty while numberDelayedMessages is still positive — the remaining messages live in not-yet-loaded snapshot segments of orphaned immutable buckets whose ledger
range falls entirely below the cursor's mark-deleted position.
In that state, nextDeliveryTime() falls through both existing if branches and unconditionally calls lastMutableBucket.nextDeliveryTime() → MutableBucket.nextDeliveryTime() → TripleLongPriorityQueue.peekN1(), which throws IllegalArgumentException("...") on the empty queue:
java.lang.IllegalArgumentException
at TripleLongPriorityQueue.peekN1(TripleLongPriorityQueue.java:126)
at MutableBucket.nextDeliveryTime(MutableBucket.java:222)
at BucketDelayedDeliveryTracker.nextDeliveryTime(BucketDelayedDeliveryTracker.java:621)
at AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:127)
at BucketDelayedDeliveryTracker.getScheduledMessages(BucketDelayedDeliveryTracker.java:754)
The exception propagates out of getScheduledMessages() / hasMessageAvailable() and surfaces on whatever broker thread polled the tracker (e.g., a dispatcher read path). The fall-through pattern has been there since PIP-195 (#17611, 2022); the trim filter made the both-empty-but-count-positive state reachable, so this is a
regression window introduced by #25984.
Modifications
numberOfDelayedMessages == 0, and is interpreted correctly by both call sites:
getScheduledMessages() and hasMessageAvailable(). Without the fix it fails with the IAE above; with the fix it asserts:
Scope note
This is a defensive fix for the crash only. The underlying inconsistency — numberDelayedMessages > 0 while no in-memory message exists — resolves itself once asyncTrimImmutableBuckets() deletes the orphaned bucket and decrements the counter, which happens as soon as immutableBuckets.size() > maxNumBuckets triggers
trim+merge again. A more thorough cleanup (e.g., probing the next snapshot segment inside the filter loop, or running trim on a schedule rather than only on bucket overflow) is intentionally out of scope for this PR and can be tracked separately if it turns into a real production concern.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes