Skip to content

fix(deepgram): recover Flux STT WebSocket closes#1813

Open
mykmelez wants to merge 7 commits into
livekit:mainfrom
mykmelez:fix/deepgram-flux-retryable-close
Open

fix(deepgram): recover Flux STT WebSocket closes#1813
mykmelez wants to merge 7 commits into
livekit:mainfrom
mykmelez:fix/deepgram-flux-retryable-close

Conversation

@mykmelez

Copy link
Copy Markdown
Contributor

Summary

Deepgram Flux streaming STT now recovers from unexpected WebSocket closes instead of permanently ending the speech stream. Previously SpeechStreamv2.#recvTask() resolved every WebSocket close event, so a provider-side close such as code 1005 looked like normal stream completion: run() broke out, no retryable error escaped, and the base SpeechStream.mainTask never reconnected.

This matches the Python Flux implementation in livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt_v2.py, which treats local/session shutdown as expected and raises on unexpected Deepgram closes so the base retry path reconnects.

What Changed

  • Track intentional WebSocket closes per socket for stream shutdown, input end, cleanup, and option-update reconnects.
  • Reject unexpected Flux WebSocket closes with a retryable APIConnectionError, letting SpeechStream.mainTask reconnect according to existing connOptions.
  • Bind send/recv tasks to their current WebSocket and wake the send loop when that socket closes, so a retried connection does not inherit stale send work from the previous connection.
  • Added local WebSocket tests for unexpected close retry, normal input-end close, and option-update reconnect with maxRetry: 0.
  • Kept Flux keepalive behavior unchanged. The Python Flux implementation intentionally leaves keepalive disabled; JS V1 STT has its own keepalive/reconnect loop, but that is the non-Flux path.

Retry Semantics Caveat

This uses the existing base stream retry policy. maxRetry defaults to 3 and the counter is lifetime-scoped for a stream, not reset after a healthy connection. For very long sessions with multiple provider-side closes, maintainers may want to decide whether to keep the Python-like base retry behavior, reset retry count after a healthy interval, or expose a dedicated configuration.

Testing

  • pnpm test plugins/deepgram/src/stt_v2.test.ts passed (3 local tests, 1 live Deepgram test skipped without DEEPGRAM_API_KEY).
  • pnpm --filter @livekit/agents-plugin-deepgram lint passed with existing warnings.
  • pnpm --filter @livekit/agents-plugin-deepgram... build passed.
  • pnpm lint passed with existing warnings.
  • pnpm build passed.
  • pnpm test agents/src/utils.test.ts passed.
  • pnpm test currently fails outside this change due provider/API-key-dependent tests (CEREBRAS_API_KEY, Google API key, OpenAI drive-thru examples), a HuggingFace download timeout in plugins/livekit/src/hf_utils.test.ts, and existing FakeLLM unhandled errors in examples/src/testing/survey_agent.test.ts.

Post-Deploy Monitoring & Validation

  • Search logs for Deepgram WebSocket closed unexpectedly, failed to recognize speech, retrying, deepgram.SpeechStreamv2, stt_error, and agent response timed out.
  • Healthy signal: an unexpected Deepgram close is followed by a retry warning and subsequent STT/turn events from the same session.
  • Failure signal: retry exhaustion such as failed to recognize speech after 4 attempts, recurring unrecoverable stt_error, or continued turn-detection stalls after a Deepgram close.
  • Suggested validation window: first release day for sessions using STTv2 with turnDetection: 'stt'.

@changeset-bot

changeset-bot Bot commented Jun 16, 2026

Copy link
Copy Markdown

🦋 Changeset detected

Latest commit: 5493712

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 35 packages
Name Type
@livekit/agents-plugin-deepgram Patch
@livekit/agents Patch
@livekit/agents-plugin-anam Patch
@livekit/agents-plugin-assemblyai Patch
@livekit/agents-plugin-baseten Patch
@livekit/agents-plugin-bey Patch
@livekit/agents-plugin-cartesia Patch
@livekit/agents-plugin-cerebras Patch
@livekit/agents-plugin-did Patch
@livekit/agents-plugin-elevenlabs Patch
@livekit/agents-plugin-fishaudio Patch
@livekit/agents-plugin-google Patch
@livekit/agents-plugin-hedra Patch
@livekit/agents-plugin-hume Patch
@livekit/agents-plugin-inworld Patch
@livekit/agents-plugin-lemonslice Patch
@livekit/agents-plugin-liveavatar Patch
@livekit/agents-plugin-livekit Patch
@livekit/agents-plugin-minimax Patch
@livekit/agents-plugin-mistral Patch
@livekit/agents-plugin-mistralai Patch
@livekit/agents-plugin-neuphonic Patch
@livekit/agents-plugin-openai Patch
@livekit/agents-plugin-perplexity Patch
@livekit/agents-plugin-phonic Patch
@livekit/agents-plugin-resemble Patch
@livekit/agents-plugin-rime Patch
@livekit/agents-plugin-runway Patch
@livekit/agents-plugin-sarvam Patch
@livekit/agents-plugin-silero Patch
@livekit/agents-plugin-soniox Patch
@livekit/agents-plugin-tavus Patch
@livekit/agents-plugin-trugen Patch
@livekit/agents-plugin-xai Patch
@livekit/agents-plugins-test Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

devin-ai-integration[bot]

This comment was marked as resolved.

@devin-ai-integration devin-ai-integration Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

Open in Devin Review

Comment thread plugins/deepgram/src/stt_v2.ts Outdated
Comment on lines +375 to +378
if (hasEnded) {
this.#audioDurationCollector.flush();
break;
}

@devin-ai-integration devin-ai-integration Bot Jun 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Behavioral change: FLUSH_SENTINEL with empty AudioByteStream no longer terminates the stream

The old sendTask had a subtle behavior where receiving FLUSH_SENTINEL when the AudioByteStream buffer was empty would cause hasEnded to stay true (because the frame loop never executed to reset it), leading to an early break and sending CloseStream. The new code at lines 355-358 unconditionally continues after a flush, regardless of whether any frames were produced. This is actually the correct behavior — flush() should flush buffered data, not terminate the stream. The old behavior was arguably a latent bug where calling stream.flush() with no buffered audio would unexpectedly close the connection. This change is intentional and correct.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@@ -398,13 +403,29 @@ class SpeechStreamv2 extends stt.SpeechStream {
}

@devin-ai-integration devin-ai-integration Bot Jun 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Deepgram API errors (type=Error) are silently swallowed by the message handler's try/catch

Pre-existing issue: #processStreamEvent throws at plugins/deepgram/src/stt_v2.ts:472 when Deepgram sends an error-type message, but this throw is caught by the try/catch at plugins/deepgram/src/stt_v2.ts:398-403 which logs it as 'Failed to parse Deepgram message'. The error is effectively swallowed — the stream continues as if nothing happened, and the new retry mechanism won't trigger for Deepgram API errors (only for unexpected WebSocket closes). This is not introduced by this PR but is impacted by it: the PR ensures unexpected closes trigger retries, but Deepgram error messages still don't.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is a real pre-existing issue, but I am leaving it out of this PR to keep the fix scoped to unexpected WebSocket close recovery. Handling Deepgram type: "Error" messages needs a separate decision about how provider error payloads should map onto APIConnectionError vs APIStatusError and which cases should be retryable. I would prefer to handle that in a follow-up so this PR does not broaden retry semantics beyond the observed unrecovered close path.

mykmelez added 3 commits June 16, 2026 18:08
After an unexpected reconnect the fresh Deepgram socket restarts
audio_window at 0, but startTimeOffset was left unchanged, so
post-reconnect transcripts were timestamped near the start of the
session. Downstream consumers that gate on absolute timing (e.g. drop
finals that land before the current answer's audio) would discard them,
so long sessions could still stall after the first reconnect.

Track the audio already streamed (#sentAudioSec) and snapshot it as a
per-connection base at each connect, offsetting that connection's window
times by it so transcript timestamps stay monotonic across reconnects.

Adds a regression test asserting the second turn's final timestamp does
not reset below the first across an unexpected reconnect.
@mykmelez

Copy link
Copy Markdown
Contributor Author

Pushed two follow-up commits for the Flux reconnect path:

  • 71bf6626 fix(deepgram): preserve STT timebase across reconnect
  • 3862af79 docs(deepgram): note the sttNode timebase contract

The reason is that reconnecting the WebSocket is necessary but not quite sufficient for Flux. A fresh Deepgram Flux socket restarts its provider-side audio_window at 0, while the Agents sttNode creates the STT stream once and sets stream.startTimeOffset once to provide session-relative timestamps across reconnects. Since plugin-internal reconnects do not recreate the stream or reset that offset, the plugin needs to preserve the per-stream audio timebase when opening a new socket.

The implementation now tracks consumed audio seconds and snapshots that value when each socket opens. Transcript timestamps use startTimeOffset + connectionTimeBaseSec, so a new socket's 0-based audio_window continues from the prior connection instead of jumping back near session start.

I also ran a local live harness against real Deepgram Flux: it streamed the repo speech fixture through STTv2, used a local WebSocket proxy to force an unexpected 1011 close after about 8.3s of audio, observed the retry/reconnect, and then saw a post-reconnect transcript from the new provider socket. The provider's second connection reported audioWindowEnd=0.640, while the SDK transcript timestamp was end=8.940, confirming the new connection was offset by the prior stream time rather than resetting to 0.

This intentionally does not change the base retry lifetime cap (maxRetry); that still seems like a separate retry policy discussion.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant