fix(mpool): MempoolFilter returns the pending transactions#7250
fix(mpool): MempoolFilter returns the pending transactions#7250akaladarshi wants to merge 4 commits into
MempoolFilter returns the pending transactions#7250Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
🔗 Linked repositories identifiedCodeRabbit considers these linked repositories for cross-repo context during reviews:
🚧 Files skipped from review as they are similar to previous changes (3)
WalkthroughPending-transaction filtering now uses a subscriber-backed mempool stream, buffers hashes per filter, and drains them through the Ethereum RPC path. Handler construction passes chain id and mempool subscriber through daemon and offline-server wiring. Pubsub and stateful tests are updated to match the new hash flow. ChangesPending transaction hash flow
Sequence Diagram(s)sequenceDiagram
participant RPCClient
participant MempoolFilterManager
participant pending_tx_added_hashes
participant MpoolSubscriber
participant MempoolFilter
RPCClient->>MempoolFilterManager: install()
MempoolFilterManager->>pending_tx_added_hashes: start fan-out task(eth_chain_id, subscriber)
pending_tx_added_hashes->>MpoolSubscriber: subscribe()
MpoolSubscriber-->>pending_tx_added_hashes: MpoolUpdate::Add
pending_tx_added_hashes-->>MempoolFilterManager: EthHash
MempoolFilterManager->>MempoolFilter: push(hash)
RPCClient->>MempoolFilterManager: remove()/eth_getFilterChanges
MempoolFilterManager->>MempoolFilter: drain()
MempoolFilter-->>RPCClient: Vec<EthHash>
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/methods/eth/filter/mempool.rs`:
- Around line 256-258: The async test in the mempool filter flow should not rely
on a single tokio::task::yield_now() to let the fan-out task deliver the
broadcast event. Update the test around the filter subscription/drain logic in
the mempool.rs path to poll with a small timeout until both filters have
received the hash, using the existing filter/broadcast setup instead of assuming
one scheduler yield is sufficient.
- Around line 119-128: The fan-out task startup in ensure_fanout_task currently
calls tokio::spawn unconditionally, which can panic if there is no active Tokio
runtime. Update the install path that reaches ensure_fanout_task to check for a
runtime first and return an installation error instead of spawning blindly, and
make sure the caller of ensure_fanout_task propagates that Result so the
existing error handling is preserved.
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 142-146: `EthEventHandler::new()` is wiring pending-filter RPCs to
`MpoolSubscriber::dummy()`, so `eth_newPendingTransactionFilter` never receives
mempool adds in states built from it. Update the RPC state constructors that use
pending filters, especially the snapshot/test paths in `generate_test_snapshot`
and `test_snapshot`, to pass a real `MessagePool` subscriber
(`mpool.subscriber()`) into `EthEventHandler::from_config(...)` instead of
relying on the dummy subscriber. Make sure the `EthEventHandler` and related
`RPCState` setup still use the same `EventsConfig` and chain ID, but are
connected to the live mempool bus.
In `@src/tool/subcommands/api_cmd/stateful_tests.rs`:
- Around line 1041-1044: The pending-tx assertions are still happening after
chain inclusion because wait_pending_message() continues into StateWaitMsg, so
the tests do not actually verify pre-mining mempool behavior. Update the
stateful tests to inspect the filter immediately after the first MpoolPending
observation, or split wait_pending_message() into a helper that returns once the
CID appears so the test can stop before StateWaitMsg. Apply the same fix in both
affected test sections that use wait_pending_message() and
poll_pending_filter_until().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: b1fab06a-d564-4511-ba0e-b6e9944153cd
📒 Files selected for processing (14)
CHANGELOG.mdsrc/daemon/mod.rssrc/message_pool/msgpool/events.rssrc/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/pending_store.rssrc/message_pool/msgpool/selection.rssrc/rpc/methods/eth.rssrc/rpc/methods/eth/filter/mempool.rssrc/rpc/methods/eth/filter/mod.rssrc/rpc/methods/eth/pubsub.rssrc/tool/offline_server/server.rssrc/tool/subcommands/api_cmd/stateful_tests.rssrc/utils/task/mod.rs
🔗 Linked repositories identified
CodeRabbit considers these linked repositories for cross-repo context during reviews:
filecoin-project/lotus(manual)
| fn ensure_fanout_task(&self) { | ||
| self.fanout_task.get_or_init(|| { | ||
| let filters = self.filters.shallow_clone(); | ||
| let mut hashes = | ||
| pending_tx_added_hashes(self.subscriber.subscribe(), self.eth_chain_id); | ||
| let task = tokio::spawn(async move { | ||
| while let Some(hash) = hashes.next().await { | ||
| fan_out(&filters.read(), hash); | ||
| } | ||
| }); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- mempool.rs outline ---'
ast-grep outline src/rpc/methods/eth/filter/mempool.rs --view expanded || true
echo '--- ensure_fanout_task occurrences ---'
rg -n "ensure_fanout_task|fn install|eth_new_pending_transaction_filter|tokio::spawn" src/rpc/methods/eth/filter -S || trueRepository: ChainSafe/forest
Length of output: 3074
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- mempool.rs relevant lines ---'
sed -n '110,150p' src/rpc/methods/eth/filter/mempool.rs | cat -n
echo '--- mod.rs relevant lines ---'
sed -n '210,340p' src/rpc/methods/eth/filter/mod.rs | cat -n
echo '--- tests around pending filter ---'
sed -n '1220,1285p' src/rpc/methods/eth/filter/mod.rs | cat -nRepository: ChainSafe/forest
Length of output: 10232
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- install_filter call sites ---'
rg -n "install_filter\(" src/rpc/methods/eth -S || true
echo '--- eth_new_pending_transaction_filter implementation ---'
sed -n '230,275p' src/rpc/methods/eth/filter/mod.rs | cat -n
echo '--- FilterManager trait and implementations ---'
sed -n '70,110p' src/rpc/methods/eth/filter/mod.rs | cat -nRepository: ChainSafe/forest
Length of output: 5330
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo '--- install_filter call sites ---'
rg -n "install_filter\(" src/rpc/methods/eth -S || true
echo '--- mod.rs around install_filter and eth_new_pending_transaction_filter ---'
sed -n '220,270p' src/rpc/methods/eth/filter/mod.rs | cat -nRepository: ChainSafe/forest
Length of output: 3372
Guard the fan-out task spawn with a runtime check
ensure_fanout_task calls tokio::spawn from a synchronous install path, so a caller without an active Tokio runtime will panic instead of getting the existing Result. Propagate an installation error here instead.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/rpc/methods/eth/filter/mempool.rs` around lines 119 - 128, The fan-out
task startup in ensure_fanout_task currently calls tokio::spawn unconditionally,
which can panic if there is no active Tokio runtime. Update the install path
that reaches ensure_fanout_task to check for a runtime first and return an
installation error instead of spawning blindly, and make sure the caller of
ensure_fanout_task propagates that Result so the existing error handling is
preserved.
| // The receiver is subscribed during install, so the event is already | ||
| // buffered; one yield lets the fan-out task drain it into both filters. | ||
| tokio::task::yield_now().await; |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
Avoid relying on a single scheduler yield in this async test.
One yield_now() does not guarantee the fan-out task has drained and pushed the broadcast event, so this can be flaky under load. Poll with a small timeout until both filters receive the hash.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/rpc/methods/eth/filter/mempool.rs` around lines 256 - 258, The async test
in the mempool filter flow should not rely on a single tokio::task::yield_now()
to let the fan-out task deliver the broadcast event. Update the test around the
filter subscription/drain logic in the mempool.rs path to poll with a small
timeout until both filters have received the hash, using the existing
filter/broadcast setup instead of assuming one scheduler yield is sufficient.
Summary of changes
Changes introduced in this pull request:
Reference issue to close (if applicable)
Closes
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
New Features
Bug Fixes
eth_newPendingTransactionFilternow returns hashes for truly pending mempool transactions (not on-chain execution events).eth_getFilterChangesnow returns only newly observed pending transaction hashes per poll, avoiding duplicates across polls.