Description
A fan-in barrier built with AddFanInBarrierEdge drops the contributions it already holds when you checkpoint and resume the run. One source delivers its message to the barrier. The run then checkpoints before the second source arrives, which is what happens when the workflow suspends on a RequestPort for human input. On resume the barrier has forgotten the first contribution. The second source arrives, the barrier still waits on the first, and it never completes: its target executor never runs and the run produces no output.
The barrier's own serialization holds up. The bug sits one level up: CheckpointAsync exports a different EdgeMap instance than the one that buffers delivered messages, so the persisted barrier always reads empty. The Additional Context section traces it.
What happened: the barrier's target never runs after resume, and the run yields no output.
Expected: once the second source arrives after resume, the barrier completes and hands every contribution, from before and after the pause, to its target.
Steps to reproduce:
- Build a workflow with a fan-in barrier whose target has two or more sources:
AddFanInBarrierEdge([sourceA, sourceB], target).
- Let
sourceA deliver to the barrier, then suspend the run on a RequestPort before sourceB delivers.
- Run with checkpointing and capture the checkpoint emitted at the suspend superstep.
- Call
ResumeStreamingAsync from that checkpoint and answer the request so sourceB delivers.
- Watch the barrier target: it never runs. Only
sourceB's contribution exists, and the barrier stays blocked on sourceA.
The trigger is broader than human-in-the-loop. Any checkpoint taken while a barrier is half-satisfied loses the buffered contributions; RequestPort is the most common reason a checkpoint lands there. The framework raises no exception, so the failure is silent.
Code Sample
A self-contained reproduction with three executors and one RequestPort. It prints BUG or PASS and exits non-zero on the bug.
using Microsoft.Agents.AI.Workflows;
// Repro: a fan-in barrier (AddFanInBarrierEdge) loses a contribution that was
// buffered BEFORE a checkpoint/resume, so after resume the barrier never
// completes and its target never runs.
//
// A ──contributes "A"──────────────────────────┐ (buffered before the pause)
// └──asks the human (RequestPort) ──suspend──> B ┤ FAN-IN BARRIER ──> Sink
// (reply) (contributes "B" after resume)
//
// Expected: after answering the request, the barrier releases both A and B to Sink.
// Actual: only B is produced after resume; A was dropped, the barrier never
// completes, and Sink never runs.
static Workflow Build()
{
var a = new A();
var b = new B();
var sink = new Sink();
var port = RequestPort.Create<Ask, Reply>("approval");
var builder = new WorkflowBuilder(a);
builder.AddEdge(a, port); // A asks the human -> run suspends
builder.AddEdge(port, b); // human reply -> B
builder.AddFanInBarrierEdge(new ExecutorBinding[] { a, b }, sink); // barrier: waits for A and B
return builder.Build();
}
var manager = CheckpointManager.CreateInMemory();
var env = InProcessExecution.Lockstep.WithCheckpointing(manager);
// 1) Run until the human-input request suspends the workflow; grab that checkpoint.
CheckpointInfo? atSuspend = null;
bool sawRequest = false;
await using (var run = await env.RunStreamingAsync(Build(), "start", "session-1"))
{
await foreach (var evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent) sawRequest = true;
if (evt is SuperStepCompletedEvent { CompletionInfo.Checkpoint: { } cp }) atSuspend = cp;
if (sawRequest && atSuspend is not null) break; // suspended with a checkpoint in hand
}
}
if (atSuspend is null) { Console.WriteLine("inconclusive: never suspended"); return 2; }
// 2) Resume from that checkpoint and answer the request so B contributes.
Sink.Received.Clear();
await using (var run = await env.ResumeStreamingAsync(Build(), atSuspend))
{
var answered = false;
await foreach (var evt in run.WatchStreamAsync())
{
if (!answered && evt is RequestInfoEvent req)
{
await run.SendResponseAsync(req.Request.CreateResponse(new Reply("yes")));
answered = true;
}
else if (answered && evt is SuperStepCompletedEvent { CompletionInfo: { HasPendingMessages: false, HasPendingRequests: false } })
break;
}
}
// 3) Did the barrier release both contributions to Sink?
Console.WriteLine($"Sink received after resume: {{{string.Join(",", Sink.Received.OrderBy(x => x))}}}");
var ok = Sink.Received.SetEquals(new[] { "A", "B" });
Console.WriteLine(ok
? "PASS: barrier completed (A and B both reached Sink)."
: "BUG: A was buffered before the pause but lost across resume; the barrier never completed and Sink never ran.");
return ok ? 0 : 1;
record Ask(string Prompt);
record Reply(string Answer);
record Item(string Source);
// A is the start executor AND the first barrier source: it contributes immediately,
// then asks the human in the same step (which suspends the run).
sealed class A() : Executor("A")
{
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder pb) =>
pb.ConfigureRoutes(r => r.AddHandler<string>(OnStart)).SendsMessage<Item>().SendsMessage<Ask>();
private async ValueTask OnStart(string input, IWorkflowContext ctx)
{
Console.WriteLine("[A] contributing 'A' to the barrier, then asking the human (suspends)");
await ctx.SendMessageAsync(new Item("A")); // -> fan-in barrier (buffered)
await ctx.SendMessageAsync(new Ask("approve?")); // -> RequestPort (suspends)
}
}
// B is the second barrier source: it contributes only after the human replies.
sealed class B() : Executor("B")
{
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder pb) =>
pb.ConfigureRoutes(r => r.AddHandler<Reply>(OnReply)).SendsMessage<Item>();
private ValueTask OnReply(Reply reply, IWorkflowContext ctx)
{
Console.WriteLine("[B] contributing 'B' to the barrier (after resume)");
return ctx.SendMessageAsync(new Item("B"));
}
}
// Sink is the fan-in barrier target; the framework runs it once per contribution when the barrier completes.
sealed class Sink() : Executor("sink")
{
public static readonly HashSet<string> Received = new();
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder pb) =>
pb.ConfigureRoutes(r => r.AddHandler<Item>(OnItem)).YieldsOutput<string>();
private ValueTask OnItem(Item item, IWorkflowContext ctx)
{
Received.Add(item.Source);
return default;
}
}
Repro.csproj:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Agents.AI.Workflows" Version="1.9.0" />
</ItemGroup>
</Project>
Error Messages / Stack Traces
The framework raises no exception. The barrier target never runs, so the failure is silent. Program output:
[A] contributing 'A' to the barrier, then asking the human (suspends)
[B] contributing 'B' to the barrier (after resume)
Sink received after resume: {}
BUG: A was buffered before the pause but lost across resume; the barrier never completed and Sink never ran.
A and B both run, yet Sink receives nothing. The resume path works; the barrier is what fails.
A custom JsonCheckpointStore shows what the suspend checkpoint persisted for the barrier edge. A had already reached the barrier live before the suspend, yet the stored state holds no message and still lists both sources as unseen:
// fan-in edge state inside the suspend checkpoint:
{"sourceIds":["A","B"],"unseen":["A","B"],"pendingMessages":[]}
// ^ both still "unseen" ^ A's buffered message is gone
Package Versions
Microsoft.Agents.AI.Workflows 1.9.0 (latest). The affected code is unchanged from 1.1.0, so every release carries the bug.
.NET Version
.NET 10. The defect is in the managed assembly, so every shipped TFM (net8.0, net9.0, net10.0) reproduces it.
Additional Context
Root cause: the checkpoint serializes a different EdgeMap than the one that buffers messages.
InProcessRunner builds two EdgeMap instances:
- Delivery map (
InProcessRunnerContext._edgeMap): every message routes through it via SendMessageAsync → PrepareDeliveryForEdgeAsync → FanInEdgeRunner.ChaseEdgeAsync → FanInEdgeState.ProcessMessage, and ProcessMessage buffers the contribution into PendingMessages.
- Checkpoint map (
InProcessRunner.EdgeMap): a separate instance that only CheckpointAsync.ExportStateAsync() and RestoreCheckpointCoreAsync.ImportStateAsync() touch.
No message reaches the checkpoint map, so its barriers stay pristine and ExportStateAsync writes an empty barrier (pendingMessages: []) every time. On resume the surviving source arrives alone and the barrier blocks forever. (InProcessRunnerContext.ExportStateAsync saves the queued messages, executors, and requests, but never _edgeMap.)
Files under dotnet/src/Microsoft.Agents.AI.Workflows/ at the dotnet-1.9.0 tag:
InProc/InProcessRunner.cs: the EdgeMap ctor, CheckpointAsync, RestoreCheckpointCoreAsync.
InProc/InProcessRunnerContext.cs: the _edgeMap ctor and PrepareDeliveryForEdgeAsync; its ExportStateAsync omits _edgeMap.
Execution/{EdgeMap,FanInEdgeRunner,FanInEdgeState}.cs: the barrier state, which works in isolation.
Suggested fix: export and import the context's _edgeMap (or share one instance) so the checkpoint reflects what the run buffered. StateManager already uses a single instance for both delivery and checkpoint; only the fan-in EdgeMap needs the change.
Impact: any fan-in target that receives one source before a checkpoint and another after produces no output on resume, with no error raised. Agent HITL hits it: a source feeds the barrier, the agent asks a human, the run checkpoints, and the post-reply source can never complete the barrier.
Workaround: keep every barrier source on the post-resume side. Thread the pre-pause value through the resuming branch so nothing waits in the barrier across a checkpoint.
Description
A fan-in barrier built with
AddFanInBarrierEdgedrops the contributions it already holds when you checkpoint and resume the run. One source delivers its message to the barrier. The run then checkpoints before the second source arrives, which is what happens when the workflow suspends on aRequestPortfor human input. On resume the barrier has forgotten the first contribution. The second source arrives, the barrier still waits on the first, and it never completes: its target executor never runs and the run produces no output.The barrier's own serialization holds up. The bug sits one level up:
CheckpointAsyncexports a differentEdgeMapinstance than the one that buffers delivered messages, so the persisted barrier always reads empty. The Additional Context section traces it.What happened: the barrier's target never runs after resume, and the run yields no output.
Expected: once the second source arrives after resume, the barrier completes and hands every contribution, from before and after the pause, to its target.
Steps to reproduce:
AddFanInBarrierEdge([sourceA, sourceB], target).sourceAdeliver to the barrier, then suspend the run on aRequestPortbeforesourceBdelivers.ResumeStreamingAsyncfrom that checkpoint and answer the request sosourceBdelivers.sourceB's contribution exists, and the barrier stays blocked onsourceA.The trigger is broader than human-in-the-loop. Any checkpoint taken while a barrier is half-satisfied loses the buffered contributions;
RequestPortis the most common reason a checkpoint lands there. The framework raises no exception, so the failure is silent.Code Sample
A self-contained reproduction with three executors and one
RequestPort. It printsBUGorPASSand exits non-zero on the bug.Repro.csproj:Error Messages / Stack Traces
The framework raises no exception. The barrier target never runs, so the failure is silent. Program output:
AandBboth run, yetSinkreceives nothing. The resume path works; the barrier is what fails.A custom
JsonCheckpointStoreshows what the suspend checkpoint persisted for the barrier edge.Ahad already reached the barrier live before the suspend, yet the stored state holds no message and still lists both sources as unseen:Package Versions
Microsoft.Agents.AI.Workflows1.9.0 (latest). The affected code is unchanged from 1.1.0, so every release carries the bug..NET Version
.NET 10. The defect is in the managed assembly, so every shipped TFM (
net8.0,net9.0,net10.0) reproduces it.Additional Context
Root cause: the checkpoint serializes a different
EdgeMapthan the one that buffers messages.InProcessRunnerbuilds twoEdgeMapinstances:InProcessRunnerContext._edgeMap): every message routes through it viaSendMessageAsync→PrepareDeliveryForEdgeAsync→FanInEdgeRunner.ChaseEdgeAsync→FanInEdgeState.ProcessMessage, andProcessMessagebuffers the contribution intoPendingMessages.InProcessRunner.EdgeMap): a separate instance that onlyCheckpointAsync.ExportStateAsync()andRestoreCheckpointCoreAsync.ImportStateAsync()touch.No message reaches the checkpoint map, so its barriers stay pristine and
ExportStateAsyncwrites an empty barrier (pendingMessages: []) every time. On resume the surviving source arrives alone and the barrier blocks forever. (InProcessRunnerContext.ExportStateAsyncsaves the queued messages, executors, and requests, but never_edgeMap.)Files under
dotnet/src/Microsoft.Agents.AI.Workflows/at thedotnet-1.9.0tag:InProc/InProcessRunner.cs: theEdgeMapctor,CheckpointAsync,RestoreCheckpointCoreAsync.InProc/InProcessRunnerContext.cs: the_edgeMapctor andPrepareDeliveryForEdgeAsync; itsExportStateAsyncomits_edgeMap.Execution/{EdgeMap,FanInEdgeRunner,FanInEdgeState}.cs: the barrier state, which works in isolation.Suggested fix: export and import the context's
_edgeMap(or share one instance) so the checkpoint reflects what the run buffered.StateManageralready uses a single instance for both delivery and checkpoint; only the fan-inEdgeMapneeds the change.Impact: any fan-in target that receives one source before a checkpoint and another after produces no output on resume, with no error raised. Agent HITL hits it: a source feeds the barrier, the agent asks a human, the run checkpoints, and the post-reply source can never complete the barrier.
Workaround: keep every barrier source on the post-resume side. Thread the pre-pause value through the resuming branch so nothing waits in the barrier across a checkpoint.