Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SlowOpenToxic to approximate the TCP handshake latency #536

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions scripts/test-e2e
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,28 @@ cli toxic delete --toxicName="latency_upstream" shopify_http

echo -e "-----------------\n"

echo "=== SlowOpen toxic downstream"

cli toxic add --downstream \
--type=slow_open \
--toxicName="slow_open_downstream" \
--attribute="delay=1000" \
--toxicity=0.99 \
shopify_http
cli inspect shopify_http

benchmark

cli toxic update --toxicName="slow_open_downstream" \
--attribute="delay=500" \
--toxicity=0.7 \
shopify_http
cli inspect shopify_http

cli toxic delete --toxicName="slow_open_downstream" shopify_http

echo -e "-----------------\n"

echo "=== Bandwidth toxic"

cli toxic add --type=bandwidth \
Expand Down
3 changes: 3 additions & 0 deletions toxics/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
)

// The LatencyToxic passes data through with the a delay of latency +/- jitter added.
//
// Note that the initial TCP handshake is not impacted by this toxic. For more details,
// see the SlowOpenToxic.
type LatencyToxic struct {
// Times in milliseconds
Latency int64 `json:"latency"`
Expand Down
86 changes: 46 additions & 40 deletions toxics/latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,57 +64,63 @@ func DoLatencyTest(t *testing.T, upLatency, downLatency *toxics.LatencyToxic) {
downLatency.Jitter,
)

msg := []byte("hello world " + strings.Repeat("a", 32*1024) + "\n")

timer := time.Now()
_, err := conn.Write(msg)
if err != nil {
t.Error("Failed writing to TCP server", err)
}

resp := <-response
if !bytes.Equal(resp, msg) {
t.Error("Server didn't read correct bytes from client:", string(resp))
}
AssertDeltaTime(t,
"Server read",
time.Since(timer),
time.Duration(upLatency.Latency)*time.Millisecond,
time.Duration(upLatency.Jitter+10)*time.Millisecond,
)
timer2 := time.Now()

scan := bufio.NewScanner(conn)
if scan.Scan() {
resp = append(scan.Bytes(), '\n')
if !bytes.Equal(resp, msg) {
t.Error("Client didn't read correct bytes from server:", string(resp))
}
}
AssertDeltaTime(t,
"Client read",
time.Since(timer2),
time.Duration(downLatency.Latency)*time.Millisecond,
time.Duration(downLatency.Jitter+10)*time.Millisecond,
)
AssertDeltaTime(t,
"Round trip",
time.Since(timer),
time.Duration(upLatency.Latency+downLatency.Latency)*time.Millisecond,
time.Duration(upLatency.Jitter+downLatency.Jitter+20)*time.Millisecond,
)
// Expecting the same latency in both rounds
doLatencyRound(t, conn, response, upLatency.Latency, downLatency.Latency, upLatency.Jitter, downLatency.Jitter)
doLatencyRound(t, conn, response, upLatency.Latency, downLatency.Latency, upLatency.Jitter, downLatency.Jitter)

ctx := context.Background()
proxy.Toxics.RemoveToxic(ctx, "latency_up")
proxy.Toxics.RemoveToxic(ctx, "latency_down")

err = conn.Close()
err := conn.Close()
if err != nil {
t.Error("Failed to close TCP connection", err)
}
})
}

func doLatencyRound(t *testing.T, conn net.Conn, response chan []byte, upLatency, downLatency, upJitter, downJitter int64) {
msg := []byte("hello world " + strings.Repeat("a", 32*1024) + "\n")

timer := time.Now()
_, err := conn.Write(msg)
if err != nil {
t.Error("Failed writing to TCP server", err)
}

resp := <-response
if !bytes.Equal(resp, msg) {
t.Error("Server didn't read correct bytes from client:", string(resp))
}
AssertDeltaTime(t,
"Server read",
time.Since(timer),
time.Duration(upLatency)*time.Millisecond,
time.Duration(upJitter+10)*time.Millisecond,
)
timer2 := time.Now()

scan := bufio.NewScanner(conn)
if scan.Scan() {
resp = append(scan.Bytes(), '\n')
if !bytes.Equal(resp, msg) {
t.Error("Client didn't read correct bytes from server:", string(resp))
}
}
AssertDeltaTime(t,
"Client read",
time.Since(timer2),
time.Duration(downLatency)*time.Millisecond,
time.Duration(downJitter+10)*time.Millisecond,
)
AssertDeltaTime(t,
"Round trip",
time.Since(timer),
time.Duration(upLatency+downLatency)*time.Millisecond,
time.Duration(upJitter+downJitter+20)*time.Millisecond,
)
}

func TestUpstreamLatency(t *testing.T) {
DoLatencyTest(t, &toxics.LatencyToxic{Latency: 100}, nil)
}
Expand Down
81 changes: 81 additions & 0 deletions toxics/slow_open.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package toxics

import (
"time"
)

// The SlowOpenToxic adds a delay to the first data packet of a new TCP
// connection, to simulate the delay experienced by a calling application
// due to the TCP handshake.
//
// For context: the TCP handshake is not covered by LatencyToxic
// and cannot be, since (in the current Toxiproxy architecture) it is
// handled by the OS network stack.
// This means that you cannot accurately simulate a latency occurring
// during the connect phase and thus test behaviors related to connection
// timeouts.
// However, if your goal is to simulate the delays experienced by the
// caller at the application level, using this toxic in addition to
// LatencyToxic will model them more accurately than using LatencyToxic
// alone.
type SlowOpenToxic struct {
// Times in milliseconds
Delay int64 `json:"delay"`
}

type SlowOpenToxicState struct {
Warm bool
}

func (t *SlowOpenToxic) GetBufferSize() int {
return 1024
}

func (t *SlowOpenToxic) Pipe(stub *ToxicStub) {
state := stub.State.(*SlowOpenToxicState)

for {
if !state.Warm {
select {
case <-stub.Interrupt:
return
case c := <-stub.Input:
if c == nil {
stub.Close()
return
}

delay := time.Duration(t.Delay) * time.Millisecond
state.Warm = true

select {
case <-time.After(delay):
c.Timestamp = c.Timestamp.Add(delay)
stub.Output <- c
case <-stub.Interrupt:
stub.Output <- c
return
}
}
} else {
select {
case <-stub.Interrupt:
return
case c := <-stub.Input:
if c == nil {
stub.Close()
return
}
stub.Output <- c
}
}
}
}

func (t *SlowOpenToxic) NewState() interface{} {
return new(SlowOpenToxicState)
}

func init() {
Register("slow_open", new(SlowOpenToxic))
}
Loading