diff --git a/scripts/test-e2e b/scripts/test-e2e index 42dafba5..b1370f3e 100755 --- a/scripts/test-e2e +++ b/scripts/test-e2e @@ -113,6 +113,28 @@ cli toxic delete --toxicName="latency_upstream" shopify_http echo -e "-----------------\n" +echo "=== SlowOpen toxic downstream" + +cli toxic add --downstream \ + --type=slow_open \ + --toxicName="slow_open_downstream" \ + --attribute="delay=1000" \ + --toxicity=0.99 \ + shopify_http +cli inspect shopify_http + +benchmark + +cli toxic update --toxicName="slow_open_downstream" \ + --attribute="delay=500" \ + --toxicity=0.7 \ + shopify_http +cli inspect shopify_http + +cli toxic delete --toxicName="slow_open_downstream" shopify_http + +echo -e "-----------------\n" + echo "=== Bandwidth toxic" cli toxic add --type=bandwidth \ diff --git a/toxics/latency.go b/toxics/latency.go index a56370af..1d72f99a 100644 --- a/toxics/latency.go +++ b/toxics/latency.go @@ -6,6 +6,9 @@ import ( ) // The LatencyToxic passes data through with the a delay of latency +/- jitter added. +// +// Note that the initial TCP handshake is not impacted by this toxic. For more details, +// see the SlowOpenToxic. type LatencyToxic struct { // Times in milliseconds Latency int64 `json:"latency"` diff --git a/toxics/latency_test.go b/toxics/latency_test.go index 1660c62f..81fed478 100644 --- a/toxics/latency_test.go +++ b/toxics/latency_test.go @@ -64,57 +64,63 @@ func DoLatencyTest(t *testing.T, upLatency, downLatency *toxics.LatencyToxic) { downLatency.Jitter, ) - msg := []byte("hello world " + strings.Repeat("a", 32*1024) + "\n") - - timer := time.Now() - _, err := conn.Write(msg) - if err != nil { - t.Error("Failed writing to TCP server", err) - } - - resp := <-response - if !bytes.Equal(resp, msg) { - t.Error("Server didn't read correct bytes from client:", string(resp)) - } - AssertDeltaTime(t, - "Server read", - time.Since(timer), - time.Duration(upLatency.Latency)*time.Millisecond, - time.Duration(upLatency.Jitter+10)*time.Millisecond, - ) - timer2 := time.Now() - - scan := bufio.NewScanner(conn) - if scan.Scan() { - resp = append(scan.Bytes(), '\n') - if !bytes.Equal(resp, msg) { - t.Error("Client didn't read correct bytes from server:", string(resp)) - } - } - AssertDeltaTime(t, - "Client read", - time.Since(timer2), - time.Duration(downLatency.Latency)*time.Millisecond, - time.Duration(downLatency.Jitter+10)*time.Millisecond, - ) - AssertDeltaTime(t, - "Round trip", - time.Since(timer), - time.Duration(upLatency.Latency+downLatency.Latency)*time.Millisecond, - time.Duration(upLatency.Jitter+downLatency.Jitter+20)*time.Millisecond, - ) + // Expecting the same latency in both rounds + doLatencyRound(t, conn, response, upLatency.Latency, downLatency.Latency, upLatency.Jitter, downLatency.Jitter) + doLatencyRound(t, conn, response, upLatency.Latency, downLatency.Latency, upLatency.Jitter, downLatency.Jitter) ctx := context.Background() proxy.Toxics.RemoveToxic(ctx, "latency_up") proxy.Toxics.RemoveToxic(ctx, "latency_down") - err = conn.Close() + err := conn.Close() if err != nil { t.Error("Failed to close TCP connection", err) } }) } +func doLatencyRound(t *testing.T, conn net.Conn, response chan []byte, upLatency, downLatency, upJitter, downJitter int64) { + msg := []byte("hello world " + strings.Repeat("a", 32*1024) + "\n") + + timer := time.Now() + _, err := conn.Write(msg) + if err != nil { + t.Error("Failed writing to TCP server", err) + } + + resp := <-response + if !bytes.Equal(resp, msg) { + t.Error("Server didn't read correct bytes from client:", string(resp)) + } + AssertDeltaTime(t, + "Server read", + time.Since(timer), + time.Duration(upLatency)*time.Millisecond, + time.Duration(upJitter+10)*time.Millisecond, + ) + timer2 := time.Now() + + scan := bufio.NewScanner(conn) + if scan.Scan() { + resp = append(scan.Bytes(), '\n') + if !bytes.Equal(resp, msg) { + t.Error("Client didn't read correct bytes from server:", string(resp)) + } + } + AssertDeltaTime(t, + "Client read", + time.Since(timer2), + time.Duration(downLatency)*time.Millisecond, + time.Duration(downJitter+10)*time.Millisecond, + ) + AssertDeltaTime(t, + "Round trip", + time.Since(timer), + time.Duration(upLatency+downLatency)*time.Millisecond, + time.Duration(upJitter+downJitter+20)*time.Millisecond, + ) +} + func TestUpstreamLatency(t *testing.T) { DoLatencyTest(t, &toxics.LatencyToxic{Latency: 100}, nil) } diff --git a/toxics/slow_open.go b/toxics/slow_open.go new file mode 100644 index 00000000..56b1c947 --- /dev/null +++ b/toxics/slow_open.go @@ -0,0 +1,81 @@ +package toxics + +import ( + "time" +) + +// The SlowOpenToxic adds a delay to the first data packet of a new TCP +// connection, to simulate the delay experienced by a calling application +// due to the TCP handshake. +// +// For context: the TCP handshake is not covered by LatencyToxic +// and cannot be, since (in the current Toxiproxy architecture) it is +// handled by the OS network stack. +// This means that you cannot accurately simulate a latency occurring +// during the connect phase and thus test behaviors related to connection +// timeouts. +// However, if your goal is to simulate the delays experienced by the +// caller at the application level, using this toxic in addition to +// LatencyToxic will model them more accurately than using LatencyToxic +// alone. +type SlowOpenToxic struct { + // Times in milliseconds + Delay int64 `json:"delay"` +} + +type SlowOpenToxicState struct { + Warm bool +} + +func (t *SlowOpenToxic) GetBufferSize() int { + return 1024 +} + +func (t *SlowOpenToxic) Pipe(stub *ToxicStub) { + state := stub.State.(*SlowOpenToxicState) + + for { + if !state.Warm { + select { + case <-stub.Interrupt: + return + case c := <-stub.Input: + if c == nil { + stub.Close() + return + } + + delay := time.Duration(t.Delay) * time.Millisecond + state.Warm = true + + select { + case <-time.After(delay): + c.Timestamp = c.Timestamp.Add(delay) + stub.Output <- c + case <-stub.Interrupt: + stub.Output <- c + return + } + } + } else { + select { + case <-stub.Interrupt: + return + case c := <-stub.Input: + if c == nil { + stub.Close() + return + } + stub.Output <- c + } + } + } +} + +func (t *SlowOpenToxic) NewState() interface{} { + return new(SlowOpenToxicState) +} + +func init() { + Register("slow_open", new(SlowOpenToxic)) +} diff --git a/toxics/slow_open_test.go b/toxics/slow_open_test.go new file mode 100644 index 00000000..03d59113 --- /dev/null +++ b/toxics/slow_open_test.go @@ -0,0 +1,211 @@ +package toxics_test + +import ( + "context" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/Shopify/toxiproxy/v2" + "github.com/Shopify/toxiproxy/v2/testhelper" + "github.com/Shopify/toxiproxy/v2/toxics" +) + +func DoSlowOpenTest(t *testing.T, upSlowOpen, downSlowOpen *toxics.SlowOpenToxic) { + WithEchoProxy(t, func(conn net.Conn, response chan []byte, proxy *toxiproxy.Proxy) { + if upSlowOpen == nil { + upSlowOpen = &toxics.SlowOpenToxic{} + } else { + _, err := proxy.Toxics.AddToxicJson( + ToxicToJson(t, "slow_open_up", "slow_open", "upstream", upSlowOpen), + ) + if err != nil { + t.Error("AddToxicJson returned error:", err) + } + } + if downSlowOpen == nil { + downSlowOpen = &toxics.SlowOpenToxic{} + } else { + _, err := proxy.Toxics.AddToxicJson( + ToxicToJson(t, "slow_open_down", "slow_open", "downstream", downSlowOpen), + ) + if err != nil { + t.Error("AddToxicJson returned error:", err) + } + } + t.Logf( + "Using slow_open: Up: %dms, Down: %dms", + upSlowOpen.Delay, + downSlowOpen.Delay, + ) + + // First round: expecting delay + doLatencyRound(t, conn, response, upSlowOpen.Delay, downSlowOpen.Delay, 0, 0) + // Second and third rounds: not expecting delay + doLatencyRound(t, conn, response, 0, 0, 0, 0) + + ctx := context.Background() + proxy.Toxics.RemoveToxic(ctx, "slow_open_up") + proxy.Toxics.RemoveToxic(ctx, "slow_open_down") + + err := conn.Close() + if err != nil { + t.Error("Failed to close TCP connection", err) + } + }) +} + +func TestUpstreamSlowOpen(t *testing.T) { + DoSlowOpenTest(t, &toxics.SlowOpenToxic{Delay: 100}, nil) +} + +func TestDownstreamSlowOpen(t *testing.T) { + DoSlowOpenTest(t, nil, &toxics.SlowOpenToxic{Delay: 100}) +} + +func TestFullstreamSlowOpenEven(t *testing.T) { + DoSlowOpenTest(t, &toxics.SlowOpenToxic{Delay: 100}, &toxics.SlowOpenToxic{Delay: 100}) +} + +func TestFullstreamSlowOpenBiasUp(t *testing.T) { + DoSlowOpenTest(t, &toxics.SlowOpenToxic{Delay: 1000}, &toxics.SlowOpenToxic{Delay: 100}) +} + +func TestFullstreamSlowOpenBiasDown(t *testing.T) { + DoSlowOpenTest(t, &toxics.SlowOpenToxic{Delay: 100}, &toxics.SlowOpenToxic{Delay: 1000}) +} + +func TestZeroDelay(t *testing.T) { + DoSlowOpenTest(t, &toxics.SlowOpenToxic{Delay: 0}, &toxics.SlowOpenToxic{Delay: 0}) +} + +func TestSlowOpenToxicCloseRace(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatal("Failed to create TCP server", err) + } + + defer ln.Close() + + proxy := NewTestProxy("test", ln.Addr().String()) + proxy.Start() + defer proxy.Stop() + + go func() { + for { + _, err := ln.Accept() + if err != nil { + return + } + } + }() + + // Check for potential race conditions when interrupting toxics + for i := 0; i < 1000; i++ { + proxy.Toxics.AddToxicJson( + ToxicToJson(t, "", "slow_open", "upstream", &toxics.SlowOpenToxic{Delay: 10}), + ) + conn, err := net.Dial("tcp", proxy.Listen) + if err != nil { + t.Error("Unable to dial TCP server", err) + } + conn.Write([]byte("hello")) + conn.Close() + proxy.Toxics.RemoveToxic(context.Background(), "slow_open") + } +} + +func TestSlowOpenToxicWithLatencyToxic(t *testing.T) { + const delay = 500 + + WithEchoProxy(t, func(conn net.Conn, response chan []byte, proxy *toxiproxy.Proxy) { + var err error + _, err = proxy.Toxics.AddToxicJson( + ToxicToJson(t, "slow_open", "slow_open", "upstream", &toxics.SlowOpenToxic{ + Delay: delay, + }), + ) + if err != nil { + t.Error("AddToxicJson returned error:", err) + } + _, err = proxy.Toxics.AddToxicJson( + ToxicToJson(t, "latency", "latency", "upstream", &toxics.LatencyToxic{ + Latency: delay, + }), + ) + if err != nil { + t.Error("AddToxicJson returned error:", err) + } + + // First round: expecting double delay (SlowOpen + Latency) + doLatencyRound(t, conn, response, delay+delay, 0, 0, 0) + + // Second and third rounds: expecting single delay (Latency only) + doLatencyRound(t, conn, response, 0+delay, 0, 0, 0) + doLatencyRound(t, conn, response, 0+delay, 0, 0, 0) + + ctx := context.Background() + proxy.Toxics.RemoveToxic(ctx, "slow_open") + proxy.Toxics.RemoveToxic(ctx, "latency") + + err = conn.Close() + if err != nil { + t.Error("Failed to close TCP connection", err) + } + }) +} + +func TestSlowOpenToxicBandwidth(t *testing.T) { + upstream := testhelper.NewUpstream(t, false) + defer upstream.Close() + + proxy := NewTestProxy("test", upstream.Addr()) + proxy.Start() + defer proxy.Stop() + + client, err := net.Dial("tcp", proxy.Listen) + if err != nil { + t.Fatalf("Unable to dial TCP server: %v", err) + } + + writtenPayload := []byte(strings.Repeat("hello world ", 1000)) + upstreamConn := <-upstream.Connections + go func(conn net.Conn, payload []byte) { + var err error + for err == nil { + _, err = conn.Write(payload) + } + }(upstreamConn, writtenPayload) + + proxy.Toxics.AddToxicJson(ToxicToJson(t, "", "slow_open", "", &toxics.SlowOpenToxic{Delay: 100})) + + time.Sleep(150 * time.Millisecond) // Wait for slow_open toxic + response := make([]byte, len(writtenPayload)) + + start := time.Now() + count := 0 + for i := 0; i < 100; i++ { + n, err := io.ReadFull(client, response) + if err != nil { + t.Fatalf("Could not read from socket: %v", err) + break + } + count += n + } + + // Assert the transfer was at least 100MB/s + AssertDeltaTime( + t, + "Latency toxic bandwidth", + time.Since(start), + 0, + time.Duration(count/100000)*time.Millisecond, + ) + + err = client.Close() + if err != nil { + t.Error("Failed to close TCP connection", err) + } +} diff --git a/toxics/toxic_test.go b/toxics/toxic_test.go index b8a123a1..85bdb01b 100644 --- a/toxics/toxic_test.go +++ b/toxics/toxic_test.go @@ -66,7 +66,7 @@ func WithEchoServer(t *testing.T, f func(string, chan []byte)) { ln.Close() scan := bufio.NewScanner(src) - if scan.Scan() { + for scan.Scan() { received := append(scan.Bytes(), '\n') response <- received