Skip to content

[fix][broker] Avoid attaching a consumer to a migrated non-persistent topic on subscribe#26075

Merged
lhotari merged 1 commit into
apache:masterfrom
lhotari:lh-fix-migrated-topic-subscribe-race
Jun 22, 2026
Merged

[fix][broker] Avoid attaching a consumer to a migrated non-persistent topic on subscribe#26075
lhotari merged 1 commit into
apache:masterfrom
lhotari:lh-fix-migrated-topic-subscribe-race

Conversation

@lhotari

@lhotari lhotari commented Jun 22, 2026

Copy link
Copy Markdown
Member

This is a follow-up regression fix for #26051.

Motivation

#26051 changed NonPersistentTopic.internalSubscribe from a blocking, ordered migration redirect into a fire-and-forget async one:

if (isMigrated()) {
    getMigratedClusterUrlAsync().thenAccept(consumer::topicMigrated);
}
addConsumerToSubscription(subscription, consumer).thenRun(...)

getMigratedClusterUrlAsync() only invokes topicMigrated() — which sends the TopicMigrated redirect and disconnects the consumer — after a metadata-read hop on pulsar.getExecutor(), while addConsumerToSubscription runs immediately. The two therefore race.

The later if (!cnx.isActive()) guard does not mitigate this, because for protocol version v5 and above disconnect()closeConsumer() only sends a CloseConsumer command and never flips cnx.isActive() to false. As a result, a consumer subscribing to a migrated non-persistent topic can be left attached to the subscription on the old (migrated) cluster instead of being cleanly redirected.

Before #26051 this code was synchronous, so topicMigrated() always completed before addConsumerToSubscription; the async change introduced the race.

This is specific to NonPersistentTopic: PersistentTopic.internalSubscribe has no inline migration check and relies solely on the post-subscribe check in ServerCnx.

Modifications

  • Route the inline migration check through the existing Consumer.checkAndApplyTopicMigrationAsync() (already used by ServerCnx on the subscribe-success path) and sequence addConsumerToSubscription behind it with thenCompose:
    • when the topic is migrated, the consumer is redirected and disconnected and addConsumerToSubscription is skipped, so the consumer is never attached to the subscription on the old cluster;
    • otherwise the original add path runs unchanged (byte-for-byte).
  • Failures from resolving the migrated-cluster URL now surface through the existing exceptionally handler instead of being silently swallowed by the fire-and-forget thenAccept. This metadata read is only reached when the subscription is already migrated (checkAndApplyTopicMigrationAsync() short-circuits to false otherwise), so the common, non-migrated subscribe path is unchanged and gains no new failure mode.
  • The topic usage count stays balanced without any extra bookkeeping: on the migrated path topicMigrated()disconnect()Consumer.close()NonPersistentSubscription.removeConsumer() already calls decrementUsageCount() (even when the consumer was never added), balancing the increment taken by handleConsumerAdded().

Verifying this change

This change added tests and can be verified as follows:

  • Added NonPersistentTopicTest.testSubscribeOnMigratedTopicSkipsAddingConsumer, a deterministic regression test that subscribes directly on a migrated non-persistent topic and asserts the consumer is never added to the subscription (verify(subscription, never()).addConsumer(...)), the migration redirect is sent, and the topic usage count returns to its pre-subscribe value. The test fails on the previous code (Mockito NeverWantedButInvoked: addConsumer is invoked via addConsumerToSubscription) and passes with the fix.
  • The existing end-to-end migration suite ClusterMigrationTest and the full NonPersistentTopicTest continue to pass.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

… topic on subscribe

Motivation

PR apache#26051 changed NonPersistentTopic.internalSubscribe from a blocking,
ordered migration redirect into a fire-and-forget async one:

    if (isMigrated()) {
        getMigratedClusterUrlAsync().thenAccept(consumer::topicMigrated);
    }
    addConsumerToSubscription(subscription, consumer).thenRun(...)

getMigratedClusterUrlAsync() only invokes topicMigrated() (which sends the
TopicMigrated redirect and disconnects the consumer) after a metadata-read hop
on pulsar.getExecutor(), while addConsumerToSubscription runs immediately, so
the two race. The downstream "if (!cnx.isActive())" guard does not mitigate it:
for protocol >= v5, disconnect() -> closeConsumer() only sends a CloseConsumer
command and never flips cnx.isActive() to false. As a result a consumer
subscribing to a migrated non-persistent topic can be left attached to the
subscription on the old cluster.

Modifications

Route the inline migration check through the existing
Consumer.checkAndApplyTopicMigrationAsync() (already used by ServerCnx) and
sequence addConsumerToSubscription behind it with thenCompose: when the topic
is migrated, the consumer is redirected and disconnected and the add is
skipped; otherwise the original add path runs unchanged. The check now also
surfaces metadata-read failures through the existing exceptionally handler
instead of swallowing them (only reachable for already-migrated topics, so the
common subscribe path is unchanged). Usage count stays balanced: the migrated
path's disconnect -> close -> removeConsumer already calls decrementUsageCount.

Added NonPersistentTopicTest.testSubscribeOnMigratedTopicSkipsAddingConsumer,
which fails on the previous code (the consumer is added via
addConsumerToSubscription) and passes with the fix.

Assisted-by: Claude Code (Claude Opus 4.8)
@lhotari lhotari merged commit d351b6b into apache:master Jun 22, 2026
44 checks passed
lhotari added a commit that referenced this pull request Jun 22, 2026
lhotari added a commit that referenced this pull request Jun 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants