Skip to content

Commit

Permalink
[BPF] send RST to midflow TCp packets from unknown streams
Browse files Browse the repository at this point in the history
For a smooth switch from iptables to ebpf mode, we do not want to
interrupt existing connections. If we see midflow packets, we pass them
to the host stack. If the stack can verify that they belong to an
existing conntrack, we let them through and we learn the conntrack.

We drop the rest. However, there are some situations when we can see a
stray TCP packet during ebpf mode, for instance when a pod dies and ECMP
kicks in and sends a packet to a different host.

If such a packet gets dropped, the end of the connections remains stuck.
This change sends an RST to such a stream instead of just dropping the
packets so that the end host can break the connection.

Fixes #8882
  • Loading branch information
tomastigera committed Jun 24, 2024
1 parent 54c3a00 commit 22554ca
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
9 changes: 8 additions & 1 deletion felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,9 +1452,16 @@ func (d *InternalDataplane) setUpIptablesBPF() {
Comment: []string{"Accept packets from flows that pre-date BPF."},
Action: iptables.AcceptAction{},
},
iptables.Rule{
Match: iptables.Match().
MarkMatchesWithMask(tcdefs.MarkSeenFallThrough, tcdefs.MarkSeenFallThroughMask).
Protocol("tcp"),
Comment: []string{"REJECT/rst packets from unknown TCP flows."},
Action: iptables.RejectAction{With: "tcp-reset"},
},
iptables.Rule{
Match: iptables.Match().MarkMatchesWithMask(tcdefs.MarkSeenFallThrough, tcdefs.MarkSeenFallThroughMask),
Comment: []string{fmt.Sprintf("%s packets from unknown flows.", d.ruleRenderer.IptablesFilterDenyAction())},
Comment: []string{fmt.Sprintf("%s packets from unknown TCP flows.", d.ruleRenderer.IptablesFilterDenyAction())},
Action: d.ruleRenderer.IptablesFilterDenyAction(),
},
)
Expand Down
60 changes: 59 additions & 1 deletion felix/fv/pktgen/pktgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
const usage = `pktgen: generates packets for Felix FV testing.
Usage:
pktgen <ip_src> <ip_dst> <proto> [--ip-id=<ip_id>] [--port-src=<port_src>] [--port-dst=<port_dst>]`
pktgen <ip_src> <ip_dst> <proto> [--ip-id=<ip_id>] [--port-src=<port_src>] [--port-dst=<port_dst>]
[--tcp-syn] [--tcp-ack] [--tcp-fin] [--tcp-rst] [--tcp-ack-no=<ack_no>] [--tcp-seq-no=<seq_no>]
`

func main() {
log.SetLevel(log.InfoLevel)
Expand Down Expand Up @@ -97,6 +99,8 @@ func main() {
switch args["<proto>"] {
case "udp":
proto = layers.IPProtocolUDP
case "tcp":
proto = layers.IPProtocolTCP
default:
log.Fatal("unsupported protocol")
}
Expand Down Expand Up @@ -147,6 +151,60 @@ func main() {
} else {
iphdr.(*layers.IPv6).Length += udp.Length
}
case layers.IPProtocolTCP:
ack := uint32(0)
if args["--tcp-ack-no"] != nil {
a, err := strconv.Atoi(args["--tcp-ack-no"].(string))
if err != nil {
log.WithError(err).Fatal("tcp ack no not a number")
}
if a > math.MaxUint32 || a < 0 {
log.Fatal("Ack no must be an unsigned 32-bit integer")
}
ack = uint32(a)
}
seq := uint32(0)
if args["--tcp-seq-no"] != nil {
s, err := strconv.Atoi(args["--tcp-seq-no"].(string))
if err != nil {
log.WithError(err).Fatal("tcp seq no not a number")
}
if s > math.MaxUint32 || s < 0 {
log.Fatal("Seq no must be an unsigned 32-bit integer")
}
seq = uint32(s)
}
tcp := &layers.TCP{
SrcPort: layers.TCPPort(sport),
DstPort: layers.TCPPort(dport),
Ack: ack,
Seq: seq,
DataOffset: 5,
}

if args["--tcp-syn"] != nil {
tcp.SYN = args["--tcp-syn"].(bool)
}
if args["--tcp-ack"] != nil {
tcp.ACK = args["--tcp-ack"].(bool)
}
if args["--tcp-fin"] != nil {
tcp.FIN = args["--tcp-fin"].(bool)
}
if args["--tcp-rst"] != nil {
tcp.RST = args["--tcp-rst"].(bool)
}

if err := tcp.SetNetworkLayerForChecksum(iphdr.(gopacket.NetworkLayer)); err != nil {
log.WithError(err).Fatal("cannot checksum tcp")
}

l4 = tcp
if family == 4 {
iphdr.(*layers.IPv4).Length += uint16(int(tcp.DataOffset*4) + len(payload))
} else {
iphdr.(*layers.IPv6).Length += uint16(int(tcp.DataOffset*4) + len(payload))
}
}

pkt := gopacket.NewSerializeBuffer()
Expand Down
43 changes: 43 additions & 0 deletions felix/fv/spoof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package fv_test

import (
"fmt"
"regexp"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

api "github.com/projectcalico/api/pkg/apis/projectcalico/v3"

"github.com/projectcalico/calico/felix/fv/connectivity"
"github.com/projectcalico/calico/felix/fv/containers"
"github.com/projectcalico/calico/felix/fv/infrastructure"
"github.com/projectcalico/calico/felix/fv/workload"
)
Expand Down Expand Up @@ -89,6 +93,41 @@ var _ = Describe("Spoof tests", func() {
cc.ExpectSome(w[1], spoofed)
cc.CheckConnectivity()
})

Context("with external client", func() {
var (
externalClient *containers.Container
)
BeforeEach(func() {
externalClient = infrastructure.RunExtClient("ext-client")
err := externalClient.CopyFileIntoContainer("../bin/pktgen", "pktgen")
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
externalClient.Stop()
})

It("should send RST for a stray TCP packet", func() {
tcpdump := tc.Felixes[0].AttachTCPDump("eth0")
tcpdump.SetLogEnabled(true)
pattern := fmt.Sprintf(`IP %s\.1234 > %s\.3434: Flags \[R\], seq 123`, tc.Felixes[0].IP, externalClient.IP)
/*
if testOpts.ipv6 {
pattern := fmt.Sprintf(`IP6 %s.1234 > %s.3434: Flags [R], seq 123`, tc.Felixes[0].IP, externalClient.IP)
}
*/
tcpdump.AddMatcher("RST", regexp.MustCompile(pattern))
tcpdump.Start("tcp", "port", "1234")
defer tcpdump.Stop()

err := externalClient.ExecMayFail("pktgen", externalClient.IP, tc.Felixes[0].IP, "tcp",
"--port-src", "3434", "--port-dst", "1234", "--tcp-ack", "--tcp-ack-no=123", "--tcp-seq-no=111")
Expect(err).NotTo(HaveOccurred())
Eventually(tcpdump.MatchCountFn("RST"), "5s", "200ms").Should(
BeNumerically("==", 1),
"We should see RST to a packet from an unknown flow")
})
})
}

Context("_BPF-SAFE_ IPv4", func() {
Expand All @@ -97,6 +136,8 @@ var _ = Describe("Spoof tests", func() {
infra, err = infrastructure.GetEtcdDatastoreInfra()
Expect(err).NotTo(HaveOccurred())
opts := infrastructure.DefaultTopologyOptions()
opts.ExtraEnvVars["FELIX_BPFConnectTimeLoadBalancing"] = string(api.BPFConnectTimeLBDisabled)
opts.ExtraEnvVars["FELIX_BPFHostNetworkedNATWithoutCTLB"] = string(api.BPFHostNetworkedNATEnabled)
tc, _ = infrastructure.StartNNodeTopology(3, opts, infra)
// Install a default profile allowing all ingress and egress,
// in the absence of policy.
Expand Down Expand Up @@ -130,6 +171,8 @@ var _ = Describe("Spoof tests", func() {
opts := infrastructure.DefaultTopologyOptions()
opts.EnableIPv6 = true
opts.IPIPEnabled = false
opts.ExtraEnvVars["FELIX_BPFConnectTimeLoadBalancing"] = string(api.BPFConnectTimeLBDisabled)
opts.ExtraEnvVars["FELIX_BPFHostNetworkedNATWithoutCTLB"] = string(api.BPFHostNetworkedNATEnabled)
opts.ExtraEnvVars["FELIX_IPV6SUPPORT"] = "true"

// The IPv4 tests had each workload running on an individual
Expand Down
4 changes: 4 additions & 0 deletions felix/iptables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ func (g DropAction) String() string {

type RejectAction struct {
TypeReject struct{}
With string
}

func (g RejectAction) ToFragment(features *environment.Features) string {
if g.With != "" {
return fmt.Sprintf("--jump REJECT --reject-with %s", g.With)
}
return "--jump REJECT"
}

Expand Down

0 comments on commit 22554ca

Please sign in to comment.