Skip to content

feat: CDC-based mixed index synchronization (#4873)#4906

Draft
porunov wants to merge 1 commit into
JanusGraph:masterfrom
porunov:feature/4873-cdc-mixed-index
Draft

feat: CDC-based mixed index synchronization (#4873)#4906
porunov wants to merge 1 commit into
JanusGraph:masterfrom
porunov:feature/4873-cdc-mixed-index

Conversation

@porunov

@porunov porunov commented Jun 28, 2026

Copy link
Copy Markdown
Member

Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving their updates from a Change-Data-Capture stream of the committed graph data, instead of a synchronous second write during the transaction that can diverge on failure and leave a permanently stale index.

Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) ->
Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk.

Key design points:

  • Reindex-from-current-state: the worker reads each changed element's current graph state and fully replaces its index document (reusing IndexSerializer, like transaction recovery). This is idempotent and order-independent, so out-of-order or duplicate events still converge to the current state -- no strict ordering required, and a stale event can never overwrite a fresh value.
  • No dual write: the only synchronous write is to storage; the index is updated downstream from the committed change stream, so it cannot diverge.
  • Element-keyed Kafka partitioning gives per-element ordering and horizontal scaling via a consumer group; batches are de-duplicated and applied as one ElasticSearch _bulk per index.
  • At-least-once: offsets are committed only after a batch is durably applied; on failure the batch is reprocessed (rewind) rather than skipped, so the index eventually catches up.

Configuration (opt-in, disabled by default):

  • storage.cql.cdc: emit the Cassandra cdc=true table option on the edgestore table.
  • index.[X].cdc.enabled / index.[X].cdc.synchronous: per-index dual mode (write synchronously AND via CDC) or cdc-only mode (skip the synchronous write; ES updated solely via CDC).

Components:

  • janusgraph-core: per-index CDC config options, the commit-side skip hook in StandardJanusGraph, and MixedIndexUpdateApplier (the backend-agnostic reindex-from-current-state engine).
  • janusgraph-cql: the storage.cql.cdc table option (no Kafka dependency in production code).
  • janusgraph-cdc (new module; core + kafka-clients): the CdcEventDecoder SPI, DebeziumCassandraJsonDecoder, CdcWorkerConfiguration, CdcIndexUpdateWorker, and the standalone CdcIndexUpdateWorkerMain runner.

Testing: 38 tests, including unit/component coverage (decoder vs real serialized bytes, reindex engine, worker loop via Kafka MockConsumer, full-chain convergence over Lucene incl. vertex/edge add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline. The full Debezium E2E is gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes it and stays green.

Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the regenerated configuration reference.

Fixes #4873
Replaces #4874


Thank you for contributing to JanusGraph!

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there an issue associated with this PR? Is it referenced in the commit message?
  • Does your PR body contain #xyz where xyz is the issue number you are trying to resolve?
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Is your initial contribution a single, squashed commit?

For code changes:

  • Have you written and/or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE.txt file, including the main LICENSE.txt file in the root of this repository?
  • If applicable, have you updated the NOTICE.txt file, including the main NOTICE.txt file found in the root of this repository?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in Change-Data-Capture (CDC) pipeline to keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with committed graph data by asynchronously reindexing affected elements from their current state, eliminating the “dual write” divergence risk.

Changes:

  • Introduces per-index CDC configuration (index.[X].cdc.*) and commit-path logic to skip synchronous mixed-index writes in cdc-only mode.
  • Adds MixedIndexUpdateApplier (reindex-from-current-state) and CdcElementChange in core, plus Cassandra storage.cql.cdc table option support.
  • Adds a new janusgraph-cdc module implementing a Kafka consumer worker, Debezium Cassandra JSON decoder, and extensive unit/component/E2E tests + documentation.

Reviewed changes

Copilot reviewed 33 out of 33 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pom.xml Registers new janusgraph-cdc Maven module in the build.
mkdocs.yml Adds CDC operator guide page to documentation nav.
janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java Validates reindex-from-current-state behavior over Lucene in cdc-only mode.
janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/CdcSkipMutationTest.java Verifies synchronous mixed-index write is skipped in cdc-only and retained in dual mode.
janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLCdcTableOptionTest.java Unit-tests that storage.cql.cdc toggles cdc=true on edgestore DDL only.
janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java Refactors CREATE TABLE building and conditionally applies Cassandra cdc=true for edgestore.
janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java Adds storage.cql.cdc configuration option.
janusgraph-core/src/test/java/org/janusgraph/graphdb/configuration/CdcIndexConfigTest.java Tests defaults and per-index scoping of index.[X].cdc.*.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java Computes cdc-only backing indexes and skips synchronous mixed-index writes for them.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java Makes TransactionLogHeader.Modification constructor public for reuse in decoding.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/MixedIndexUpdateApplier.java Adds backend-agnostic reindex-from-current-state applier for CDC worker.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/CdcElementChange.java Adds normalized “element changed” model consumed by the applier/worker.
janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java Adds index.[X].cdc.enabled and index.[X].cdc.synchronous options.
janusgraph-cdc/src/test/resources/cassandra-cdc.yaml Provides Cassandra config enabling CDC for full pipeline E2E test.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoderTest.java Tests Debezium JSON decoding against real JanusGraph-serialized bytes.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConvergenceTest.java Drives worker+decoder+applier via MockConsumer to validate convergence semantics.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConfigurationTest.java Tests worker configuration defaults and properties parsing.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcKafkaElasticsearchTest.java Testcontainers E2E for Kafka → worker → ElasticSearch convergence.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMainTest.java Tests runner wiring and config reflection of CDC-enabled backing indexes.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerLoopTest.java Unit-tests polling loop semantics (dedupe/retry/commit/rewind).
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcEventDecoderTest.java Smoke test for decoder SPI and CdcElementChange interop.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcCassandraDebeziumElasticsearchTest.java Full Cassandra CDC → Debezium → Kafka → ElasticSearch pipeline E2E (profile-gated).
janusgraph-cdc/src/test/java/io/debezium/connector/cassandra/JanusGraphCdcConnectorStarter.java Test-only bridge to start Debezium Cassandra connector embedded.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoder.java Implements Debezium Cassandra JSON → CdcElementChange decoding.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcWorkerConfiguration.java Defines immutable worker/Kafka configuration and consumer properties.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMain.java Adds standalone runner that opens graph, wires decoder+applier, starts workers.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java Implements Kafka consume/decode/dedupe/apply/retry/commit/rewind loop.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexApplier.java Functional interface to abstract index application for worker tests.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcEventDecoder.java Decoder SPI for CDC record formats.
janusgraph-cdc/pom.xml New module POM with Kafka clients + testcontainers/Debezium profile gating.
docs/configs/janusgraph-cfg.md Regenerates config reference including new CDC options.
docs/changelog.md Adds 1.2.0 upgrade note describing CDC mixed index synchronization.
docs/advanced-topics/cdc-mixed-index.md Adds operator guide for Cassandra CDC + Debezium + Kafka + worker setup.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread janusgraph-cdc/src/main/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoder.java Outdated
Comment thread janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java Outdated
Comment thread janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMainTest.java Outdated
@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch 4 times, most recently from a7d91de to 5659caf Compare June 30, 2026 19:51
@porunov porunov requested a review from Copilot June 30, 2026 22:25

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.

Comment thread janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java Outdated
Comment thread janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMain.java Outdated
@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch 2 times, most recently from 48c32ba to 3e6cc0e Compare July 1, 2026 10:08
@porunov porunov requested a review from Copilot July 1, 2026 10:09

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.

Comments suppressed due to low confidence (1)

janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java:1

  • The 10-second schema/index enablement timeout is likely to be flaky under loaded CI runners (especially with filesystem-backed Lucene + BerkeleyJE). Consider increasing these timeouts (e.g., 30–60 seconds) or using a shared constant aligned with other JanusGraph index-status tests. The same concern applies to other new tests using 10-second awaitGraphIndexStatus timeouts.

Comment thread .github/workflows/ci-cdc-dummy.yml
@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 3e6cc0e to 0ee776b Compare July 1, 2026 10:42
@porunov porunov requested a review from Copilot July 1, 2026 11:13

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLCdcTableOptionTest.java:1

  • This assertion is fairly broad and could pass if an unrelated substring containing 'cdc' appears in the DDL. Tightening it to assert the specific table option (e.g., matching cdc = true / WITH cdc = true) would make the test more robust and less prone to false positives.

Comment thread janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java Outdated
@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 0ee776b to 0b2ce55 Compare July 1, 2026 11:45
@porunov porunov requested a review from Copilot July 1, 2026 11:50

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 1 comment.

@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from dbfa067 to 8754f87 Compare July 1, 2026 17:02
@porunov porunov requested a review from Copilot July 1, 2026 17:12

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 2 comments.

@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 8754f87 to 37296aa Compare July 1, 2026 17:32
@porunov porunov requested a review from Copilot July 1, 2026 17:52

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 2 comments.

Comment thread janusgraph-cdc/pom.xml
@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 37296aa to 25632c3 Compare July 1, 2026 18:26
@porunov porunov requested a review from Copilot July 1, 2026 18:28

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 8 comments.

@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 25632c3 to b20b8fb Compare July 1, 2026 19:13
@porunov porunov requested a review from Copilot July 1, 2026 19:20

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.

Comment thread docs/advanced-topics/cdc-mixed-index.md
Comment thread .github/workflows/ci-cdc-dummy.yml

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 38 out of 38 changed files in this pull request and generated 1 comment.

@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 7aaf3dc to 411925e Compare July 1, 2026 21:55
@porunov porunov requested a review from Copilot July 1, 2026 21:56

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 38 out of 38 changed files in this pull request and generated 1 comment.

Comment thread janusgraph-cdc/pom.xml
Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving
their updates from a Change-Data-Capture stream of the committed graph data, instead of a
synchronous second write during the transaction that can diverge on failure and leave a
permanently stale index.

Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) ->
Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk.

Key design points:
- Reindex-from-current-state: the worker reads each changed element's current graph state and
  fully replaces its index document (reusing IndexSerializer, like transaction recovery). This is
  idempotent and order-independent, so out-of-order or duplicate events still converge to the
  current state -- no strict ordering required, and a stale event can never overwrite a fresh value.
- No dual write: the only synchronous write is to storage; the index is updated downstream from
  the committed change stream, so it cannot diverge.
- Element-keyed Kafka partitioning gives per-element ordering and horizontal scaling via a
  consumer group; batches are de-duplicated and applied as one ElasticSearch _bulk per index.
- At-least-once: offsets are committed only after a batch is durably applied; on failure the
  batch is reprocessed (rewind) rather than skipped, so the index eventually catches up.

Configuration (opt-in, disabled by default):
- storage.cql.cdc: emit the Cassandra cdc=true table option on the edgestore table.
- index.[X].cdc.enabled / index.[X].cdc.synchronous: per-index dual mode (write synchronously AND
  via CDC) or cdc-only mode (skip the synchronous write; ES updated solely via CDC).

Components:
- janusgraph-core: per-index CDC config options, the commit-side skip hook in StandardJanusGraph
  (with a startup warning when cdc-only mode is configured), and MixedIndexUpdateApplier (the
  backend-agnostic reindex-from-current-state engine, covering vertex, edge and property-element
  mixed indexes). The restore paths (ElementCategory.retrieve, IndexSerializer.removeElement) now
  accept custom String vertex ids in addition to Long, and RelationIdentifierUtils.findRelation no
  longer NPEs when a relation's adjacent vertex has been removed.
- janusgraph-cql: the storage.cql.cdc table option (no Kafka dependency in production code).
- janusgraph-cdc (new module; core + kafka-clients): the CdcEventDecoder SPI,
  DebeziumCassandraJsonDecoder (parses the relation header directly, so IN-direction edge columns
  and value-less delete tombstones of MULTI edges resolve to the correct edge identity), the
  CdcWorkerConfiguration, the CdcIndexUpdateWorker (two-phase shutdown, interrupt-aware retries, no
  consumer leaks), and the standalone CdcIndexUpdateWorkerMain runner. The deletion limits of
  cdc-only mode for constrained-multiplicity edge indexes and meta-property indexes are documented.

Testing: 54 tests, including unit/component coverage (decoder vs real serialized bytes incl.
poison-pill skips for invalid-Base64/malformed-key, delete-envelope after=null/before fallback,
IN-direction columns and value-less edge-delete tombstones; reindex engine over
vertex/edge/property-element indexes incl. document removal when an element loses all indexed
fields, removed-endpoint edges, and custom String vertex ids; worker loop via Kafka MockConsumer
incl. at-least-once rewind on decode/apply failure and leak-free lifecycle; fail-fast config and
CDC-enablement validation; full-chain convergence over Lucene incl. vertex/edge add/update/remove
and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the
full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline covering the vertex AND edge
lifecycle (add/update/property-removal/delete) against real Debezium delete envelopes. The real-container tests are gated behind the cassandra-cdc-e2e
Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the
default Java 8/11 build excludes them and stays green.

CI: a dedicated workflow (.github/workflows/ci-cdc.yml) runs the cdc unit tests on Java 8 and 11 and
the full real-container suite -- including the Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch
pipeline -- on Java 17 with Docker, so the integration is exercised on every change and guards
against regression.

Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the
regenerated configuration reference.

Fixes JanusGraph#4873
Replaces JanusGraph#4874

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
@porunov porunov force-pushed the feature/4873-cdc-mixed-index branch from 411925e to daa717d Compare July 1, 2026 22:15
@porunov porunov requested a review from Copilot July 1, 2026 22:17

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 39 changed files in this pull request and generated 3 comments.

Comment on lines +40 to +42
public static final String RETRY_LIMIT = "cdc.retry.limit";
public static final String RETRY_INITIAL_WAIT_MS = "cdc.retry.initial-wait-ms";
public static final String RETRY_MAX_WAIT_MS = "cdc.retry.max-wait-ms";
Comment on lines +128 to +133
private Collection<CdcElementChange> resolveChanges(Object vertexId, byte[] columnBytes, byte[] valueBytes) {
final StaticBuffer column = new StaticArrayBuffer(columnBytes);
final StaticBuffer value = new StaticArrayBuffer(valueBytes != null ? valueBytes : new byte[0]);
final Entry entry = StaticArrayEntry.of(column, value);
final StandardJanusGraphTx tx = (StandardJanusGraphTx) graph.newTransaction();
try {
Comment thread janusgraph-cdc/pom.xml
Comment on lines +27 to +31
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support CDC mode for Mixed Index mutations

2 participants