diff --git a/README.md b/README.md index e36ca5cd..263a72d4 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ stopping you from creating a client in any other language (see - [reset_peer](#reset_peer) - [slicer](#slicer) - [limit_data](#limit_data) + - [packet_loss](#packet_loss) - [HTTP API](#http-api) - [Proxy fields:](#proxy-fields) - [Toxic fields:](#toxic-fields) @@ -446,6 +447,15 @@ Closes connection when transmitted data exceeded limit. - `bytes`: number of bytes it should transmit before connection is closed +#### packet_loss + +Randomly drops chunks flowing through the proxy simulating +flaky Wi-Fi, mobile, or satellite network conditions. + +Attributes: + - `loss_rate`: probability [0.0–1.0] that a chunk is dropped (default 0.1) + - `correlation`: extra drop probability when the previous chunk was dropped, modeling burst loss (default 0.0) + ### HTTP API All communication with the Toxiproxy daemon from the client happens through the diff --git a/toxics/packet_loss.go b/toxics/packet_loss.go new file mode 100644 index 00000000..247468ef --- /dev/null +++ b/toxics/packet_loss.go @@ -0,0 +1,130 @@ +package toxics + +// PacketLossToxic randomly drops StreamChunks passing through the proxy, +// simulating packet loss / flaky network conditions. +// +// Attributes: +// - loss_rate : float64 probability [0.0–1.0] that a chunk is dropped +// (default 0.1 -> 10 %) +// - correlation : float64 probability [0.0–1.0] that the *next* chunk is +// also dropped when the previous one was (Gilbert- +// Elliott burst model; default 0.0 -> no burstiness) +// +// How it fits toxiproxy's pipeline: +// +// Client -> [noop] -> [packet_loss] -> [noop] -> Upstream +// | +// dropped chunks +// are discarded +// +// Registration happens automatically via init(). + +import ( + "math/rand" +) + +// PacketLossToxicState holds per-connection mutable state so that the +// main toxic struct (shared across connections) stays read-only. +type PacketLossToxicState struct { + // wasDropped records whether the previous chunk was dropped; used for + // burst-correlation logic. + wasDropped bool + // rng is a per-connection source so concurrent connections don't + // contend on a shared global rand. + rng *rand.Rand +} + +// PacketLossToxic is the toxic struct. Fields are JSON-tagged to match +// toxiproxy's HTTP API convention. +type PacketLossToxic struct { + // LossRate is the baseline probability that any individual chunk is + // dropped. Range [0.0, 1.0]. Default 0.1 (10 %). + LossRate float64 `json:"loss_rate"` + + // Correlation is the extra probability that the *next* chunk is dropped + // when the current one was dropped, modelling burst packet loss (Gilbert- + // Elliott model simplified). Range [0.0, 1.0]. Default 0.0. + Correlation float64 `json:"correlation"` +} + +// NewState satisfies the StatefulToxic interface. toxiproxy calls this once +// per new connection so every connection gets its own RNG and drop state. +func (t *PacketLossToxic) NewState() interface{} { + return &PacketLossToxicState{ + rng: rand.New(rand.NewSource(rand.Int63())), + } +} + +// Pipe satisfies the Toxic interface. It reads chunks from stub.Input, +// decides whether to forward or drop each one, and writes survivors to +// stub.Output. It exits when the input channel is closed or an interrupt +// arrives. +func (t *PacketLossToxic) Pipe(stub *ToxicStub) { + state := stub.State.(*PacketLossToxicState) + + // Clamp configuration to valid ranges once, up front. + lossRate := clamp(t.LossRate, 0.0, 1.0) + correlation := clamp(t.Correlation, 0.0, 1.0) + + for { + select { + case <-stub.Interrupt: + // toxiproxy is removing or reconfiguring this toxic; drain cleanly. + return + + case chunk, ok := <-stub.Input: + if !ok { + // Upstream closed the connection. + return + } + + if t.shouldDrop(state, lossRate, correlation) { + // Drop: discard the chunk entirely. The byte slice is simply + // not forwarded – no close, no RST – mimicking a lost IP packet. + state.wasDropped = true + continue + } + + state.wasDropped = false + + // Forward the chunk unmodified. + select { + case stub.Output <- chunk: + case <-stub.Interrupt: + return + } + } + } +} + +// shouldDrop returns true when the current chunk must be discarded. +// It implements a simplified Gilbert-Elliott two-state model: +// - In the "good" state -> drop with probability lossRate +// - In the "bad" state -> drop with probability (lossRate + correlation), +// where "bad" means the previous chunk was also dropped. +func (t *PacketLossToxic) shouldDrop( + state *PacketLossToxicState, + lossRate, correlation float64, +) bool { + p := lossRate + if state.wasDropped { + p = min(1.0, lossRate+correlation) + } + return state.rng.Float64() < p +} + +// init registers the toxic with toxiproxy automatically at startup. +func init() { + Register("packet_loss", new(PacketLossToxic)) +} + +// clamp constrains v to the range [lo, hi]. +func clamp(v, lo, hi float64) float64 { + if v < lo { + return lo + } + if v > hi { + return hi + } + return v +} \ No newline at end of file diff --git a/toxics/packet_loss_test.go b/toxics/packet_loss_test.go new file mode 100644 index 00000000..c4ec2c3a --- /dev/null +++ b/toxics/packet_loss_test.go @@ -0,0 +1,137 @@ +package toxics_test + +import ( + "testing" + "time" + + "github.com/Shopify/toxiproxy/v2/stream" + "github.com/Shopify/toxiproxy/v2/toxics" +) + +// TestPacketLossToxicNoLoss verifies that with loss_rate=0 every chunk passes. +func TestPacketLossToxicNoLoss(t *testing.T) { + toxic := &toxics.PacketLossToxic{LossRate: 0.0, Correlation: 0.0} + runPipeTest(t, toxic, 1000, 0) +} + +// TestPacketLossToxicFullLoss verifies that with loss_rate=1 no chunk passes. +func TestPacketLossToxicFullLoss(t *testing.T) { + toxic := &toxics.PacketLossToxic{LossRate: 1.0, Correlation: 0.0} + runPipeTest(t, toxic, 100, 100) +} + +// TestPacketLossToxicApproximateRate verifies drop rate is within ±10% of target. +func TestPacketLossToxicApproximateRate(t *testing.T) { + const targetRate = 0.20 + toxic := &toxics.PacketLossToxic{LossRate: targetRate, Correlation: 0.0} + + const n = 1000 + dropped, _ := runPipeTestWithStats(t, toxic, n) + + empirical := float64(dropped) / float64(n) + tolerance := 0.10 + + if empirical < targetRate-tolerance || empirical > targetRate+tolerance { + t.Errorf("empirical drop rate %.2f outside [%.2f, %.2f]", + empirical, targetRate-tolerance, targetRate+tolerance) + } +} + +// TestPacketLossToxicIndependentConnections verifies independent RNG per connection. +func TestPacketLossToxicIndependentConnections(t *testing.T) { + toxic := &toxics.PacketLossToxic{LossRate: 0.5} + s1 := toxic.NewState() + s2 := toxic.NewState() + + if s1 == s2 { + t.Error("NewState must return different instances per connection") + } +} + +// TestPacketLossToxicPipeDrains confirms Pipe exits cleanly on interrupt. +func TestPacketLossToxicPipeDrains(t *testing.T) { + toxic := &toxics.PacketLossToxic{LossRate: 0.0} + + input := make(chan *stream.StreamChunk, 1) + output := make(chan *stream.StreamChunk, 1) + interrupt := make(chan struct{}) + + stub := &toxics.ToxicStub{ + Input: input, + Output: output, + Interrupt: interrupt, + State: toxic.NewState(), + } + + done := make(chan struct{}) + go func() { + toxic.Pipe(stub) + close(done) + }() + + close(interrupt) + + select { + case <-done: + // OK + case <-time.After(time.Second): + t.Fatal("Pipe did not exit within 1s after interrupt") + } +} + +// runPipeTest sends n chunks and asserts exactly expectDropped were dropped. +func runPipeTest(t *testing.T, toxic *toxics.PacketLossToxic, n int, expectDropped int) { + t.Helper() + dropped, passed := runPipeTestWithStats(t, toxic, n) + if dropped != expectDropped { + t.Errorf("expected %d dropped, got %d (passed: %d)", expectDropped, dropped, passed) + } +} + +// runPipeTestWithStats returns (dropped, passed) counts. +// It drains output concurrently to avoid blocking the pipe. +func runPipeTestWithStats(t *testing.T, toxic *toxics.PacketLossToxic, n int) (int, int) { + t.Helper() + + // Buffer input generously so the sender never blocks. + input := make(chan *stream.StreamChunk, n) + // Unbuffered output — drained by a goroutine below. + output := make(chan *stream.StreamChunk) + interrupt := make(chan struct{}) + + stub := &toxics.ToxicStub{ + Input: input, + Output: output, + Interrupt: interrupt, + State: toxic.NewState(), + } + + // Fill input before starting Pipe so timing doesn't matter. + for i := 0; i < n; i++ { + input <- &stream.StreamChunk{ + Data: []byte{byte(i % 256)}, + Timestamp: time.Now(), + } + } + close(input) + + // Drain output concurrently so Pipe never blocks on send. + passed := 0 + drainDone := make(chan struct{}) + go func() { + for range output { + passed++ + } + close(drainDone) + }() + + // Run Pipe — exits when input is closed. + toxic.Pipe(stub) + + // Pipe has returned, close output so the drain goroutine finishes. + close(output) + <-drainDone + + dropped := n - passed + return dropped, passed +} \ No newline at end of file