feat: CDC-based mixed index synchronization (#4873)#4906
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in Change-Data-Capture (CDC) pipeline to keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with committed graph data by asynchronously reindexing affected elements from their current state, eliminating the “dual write” divergence risk.
Changes:
- Introduces per-index CDC configuration (
index.[X].cdc.*) and commit-path logic to skip synchronous mixed-index writes in cdc-only mode. - Adds
MixedIndexUpdateApplier(reindex-from-current-state) andCdcElementChangein core, plus Cassandrastorage.cql.cdctable option support. - Adds a new
janusgraph-cdcmodule implementing a Kafka consumer worker, Debezium Cassandra JSON decoder, and extensive unit/component/E2E tests + documentation.
Reviewed changes
Copilot reviewed 33 out of 33 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pom.xml | Registers new janusgraph-cdc Maven module in the build. |
| mkdocs.yml | Adds CDC operator guide page to documentation nav. |
| janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java | Validates reindex-from-current-state behavior over Lucene in cdc-only mode. |
| janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/CdcSkipMutationTest.java | Verifies synchronous mixed-index write is skipped in cdc-only and retained in dual mode. |
| janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLCdcTableOptionTest.java | Unit-tests that storage.cql.cdc toggles cdc=true on edgestore DDL only. |
| janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java | Refactors CREATE TABLE building and conditionally applies Cassandra cdc=true for edgestore. |
| janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java | Adds storage.cql.cdc configuration option. |
| janusgraph-core/src/test/java/org/janusgraph/graphdb/configuration/CdcIndexConfigTest.java | Tests defaults and per-index scoping of index.[X].cdc.*. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java | Computes cdc-only backing indexes and skips synchronous mixed-index writes for them. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java | Makes TransactionLogHeader.Modification constructor public for reuse in decoding. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/MixedIndexUpdateApplier.java | Adds backend-agnostic reindex-from-current-state applier for CDC worker. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/CdcElementChange.java | Adds normalized “element changed” model consumed by the applier/worker. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java | Adds index.[X].cdc.enabled and index.[X].cdc.synchronous options. |
| janusgraph-cdc/src/test/resources/cassandra-cdc.yaml | Provides Cassandra config enabling CDC for full pipeline E2E test. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoderTest.java | Tests Debezium JSON decoding against real JanusGraph-serialized bytes. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConvergenceTest.java | Drives worker+decoder+applier via MockConsumer to validate convergence semantics. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConfigurationTest.java | Tests worker configuration defaults and properties parsing. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcKafkaElasticsearchTest.java | Testcontainers E2E for Kafka → worker → ElasticSearch convergence. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMainTest.java | Tests runner wiring and config reflection of CDC-enabled backing indexes. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerLoopTest.java | Unit-tests polling loop semantics (dedupe/retry/commit/rewind). |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcEventDecoderTest.java | Smoke test for decoder SPI and CdcElementChange interop. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcCassandraDebeziumElasticsearchTest.java | Full Cassandra CDC → Debezium → Kafka → ElasticSearch pipeline E2E (profile-gated). |
| janusgraph-cdc/src/test/java/io/debezium/connector/cassandra/JanusGraphCdcConnectorStarter.java | Test-only bridge to start Debezium Cassandra connector embedded. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoder.java | Implements Debezium Cassandra JSON → CdcElementChange decoding. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcWorkerConfiguration.java | Defines immutable worker/Kafka configuration and consumer properties. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMain.java | Adds standalone runner that opens graph, wires decoder+applier, starts workers. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java | Implements Kafka consume/decode/dedupe/apply/retry/commit/rewind loop. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexApplier.java | Functional interface to abstract index application for worker tests. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcEventDecoder.java | Decoder SPI for CDC record formats. |
| janusgraph-cdc/pom.xml | New module POM with Kafka clients + testcontainers/Debezium profile gating. |
| docs/configs/janusgraph-cfg.md | Regenerates config reference including new CDC options. |
| docs/changelog.md | Adds 1.2.0 upgrade note describing CDC mixed index synchronization. |
| docs/advanced-topics/cdc-mixed-index.md | Adds operator guide for Cassandra CDC + Debezium + Kafka + worker setup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
a7d91de to
5659caf
Compare
48c32ba to
3e6cc0e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java:1
- The 10-second schema/index enablement timeout is likely to be flaky under loaded CI runners (especially with filesystem-backed Lucene + BerkeleyJE). Consider increasing these timeouts (e.g., 30–60 seconds) or using a shared constant aligned with other JanusGraph index-status tests. The same concern applies to other new tests using 10-second
awaitGraphIndexStatustimeouts.
3e6cc0e to
0ee776b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLCdcTableOptionTest.java:1
- This assertion is fairly broad and could pass if an unrelated substring containing 'cdc' appears in the DDL. Tightening it to assert the specific table option (e.g., matching
cdc = true/WITH cdc = true) would make the test more robust and less prone to false positives.
0ee776b to
0b2ce55
Compare
0b2ce55 to
de9c758
Compare
dbfa067 to
8754f87
Compare
8754f87 to
37296aa
Compare
37296aa to
25632c3
Compare
25632c3 to
b20b8fb
Compare
b20b8fb to
7aaf3dc
Compare
7aaf3dc to
411925e
Compare
Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving their updates from a Change-Data-Capture stream of the committed graph data, instead of a synchronous second write during the transaction that can diverge on failure and leave a permanently stale index. Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) -> Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk. Key design points: - Reindex-from-current-state: the worker reads each changed element's current graph state and fully replaces its index document (reusing IndexSerializer, like transaction recovery). This is idempotent and order-independent, so out-of-order or duplicate events still converge to the current state -- no strict ordering required, and a stale event can never overwrite a fresh value. - No dual write: the only synchronous write is to storage; the index is updated downstream from the committed change stream, so it cannot diverge. - Element-keyed Kafka partitioning gives per-element ordering and horizontal scaling via a consumer group; batches are de-duplicated and applied as one ElasticSearch _bulk per index. - At-least-once: offsets are committed only after a batch is durably applied; on failure the batch is reprocessed (rewind) rather than skipped, so the index eventually catches up. Configuration (opt-in, disabled by default): - storage.cql.cdc: emit the Cassandra cdc=true table option on the edgestore table. - index.[X].cdc.enabled / index.[X].cdc.synchronous: per-index dual mode (write synchronously AND via CDC) or cdc-only mode (skip the synchronous write; ES updated solely via CDC). Components: - janusgraph-core: per-index CDC config options, the commit-side skip hook in StandardJanusGraph (with a startup warning when cdc-only mode is configured), and MixedIndexUpdateApplier (the backend-agnostic reindex-from-current-state engine, covering vertex, edge and property-element mixed indexes). The restore paths (ElementCategory.retrieve, IndexSerializer.removeElement) now accept custom String vertex ids in addition to Long, and RelationIdentifierUtils.findRelation no longer NPEs when a relation's adjacent vertex has been removed. - janusgraph-cql: the storage.cql.cdc table option (no Kafka dependency in production code). - janusgraph-cdc (new module; core + kafka-clients): the CdcEventDecoder SPI, DebeziumCassandraJsonDecoder (parses the relation header directly, so IN-direction edge columns and value-less delete tombstones of MULTI edges resolve to the correct edge identity), the CdcWorkerConfiguration, the CdcIndexUpdateWorker (two-phase shutdown, interrupt-aware retries, no consumer leaks), and the standalone CdcIndexUpdateWorkerMain runner. The deletion limits of cdc-only mode for constrained-multiplicity edge indexes and meta-property indexes are documented. Testing: 54 tests, including unit/component coverage (decoder vs real serialized bytes incl. poison-pill skips for invalid-Base64/malformed-key, delete-envelope after=null/before fallback, IN-direction columns and value-less edge-delete tombstones; reindex engine over vertex/edge/property-element indexes incl. document removal when an element loses all indexed fields, removed-endpoint edges, and custom String vertex ids; worker loop via Kafka MockConsumer incl. at-least-once rewind on decode/apply failure and leak-free lifecycle; fail-fast config and CDC-enablement validation; full-chain convergence over Lucene incl. vertex/edge add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline covering the vertex AND edge lifecycle (add/update/property-removal/delete) against real Debezium delete envelopes. The real-container tests are gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes them and stays green. CI: a dedicated workflow (.github/workflows/ci-cdc.yml) runs the cdc unit tests on Java 8 and 11 and the full real-container suite -- including the Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline -- on Java 17 with Docker, so the integration is exercised on every change and guards against regression. Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the regenerated configuration reference. Fixes JanusGraph#4873 Replaces JanusGraph#4874 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
411925e to
daa717d
Compare
| public static final String RETRY_LIMIT = "cdc.retry.limit"; | ||
| public static final String RETRY_INITIAL_WAIT_MS = "cdc.retry.initial-wait-ms"; | ||
| public static final String RETRY_MAX_WAIT_MS = "cdc.retry.max-wait-ms"; |
| private Collection<CdcElementChange> resolveChanges(Object vertexId, byte[] columnBytes, byte[] valueBytes) { | ||
| final StaticBuffer column = new StaticArrayBuffer(columnBytes); | ||
| final StaticBuffer value = new StaticArrayBuffer(valueBytes != null ? valueBytes : new byte[0]); | ||
| final Entry entry = StaticArrayEntry.of(column, value); | ||
| final StandardJanusGraphTx tx = (StandardJanusGraphTx) graph.newTransaction(); | ||
| try { |
| <dependency> | ||
| <groupId>org.apache.kafka</groupId> | ||
| <artifactId>kafka-clients</artifactId> | ||
| <version>${kafka.version}</version> | ||
| </dependency> |
Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving their updates from a Change-Data-Capture stream of the committed graph data, instead of a synchronous second write during the transaction that can diverge on failure and leave a permanently stale index.
Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) ->
Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk.
Key design points:
Configuration (opt-in, disabled by default):
Components:
Testing: 38 tests, including unit/component coverage (decoder vs real serialized bytes, reindex engine, worker loop via Kafka MockConsumer, full-chain convergence over Lucene incl. vertex/edge add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline. The full Debezium E2E is gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes it and stays green.
Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the regenerated configuration reference.
Fixes #4873
Replaces #4874
Thank you for contributing to JanusGraph!
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
master)?For code changes:
For documentation related changes: