Skip to content

Commit

Permalink
Let consumer application read packets(plain and tls) perf buffers dir…
Browse files Browse the repository at this point in the history
…ectly (#121)
  • Loading branch information
iluxa authored Dec 12, 2024
1 parent 03ec8d4 commit bb40525
Show file tree
Hide file tree
Showing 12 changed files with 484 additions and 99 deletions.
7 changes: 7 additions & 0 deletions bpf/include/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,11 @@ Copyright (C) Kubeshark
#define memcpy(dest, src, n) __builtin_memcpy((dest), (src), (n))
#endif

#ifndef likely
#define likely(x) __builtin_expect((x), 1)
#endif
#ifndef unlikely
#define unlikely(x) __builtin_expect((x), 0)
#endif

#endif /* __UTIL__ */
12 changes: 10 additions & 2 deletions bpf/packet_sniffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ static __always_inline int filter_packets(struct __sk_buff *skb, void *cgrpctxma
if (side == PACKET_DIRECTION_RECEIVED)
{
TRACE_PACKET("cg/in", true, skb->local_ip4, skb->remote_ip4, skb->local_port & 0xffff, skb->remote_port & 0xffff, cgroup_id);
save_packet(skb, src_ip, skb->remote_port>>16, dst_ip, bpf_htons(skb->local_port), cgroup_id, side);
}
else
{
TRACE_PACKET("cg/out", true, skb->local_ip4, skb->remote_ip4, skb->local_port & 0xffff, skb->remote_port & 0xffff, cgroup_id);
save_packet(skb, src_ip, bpf_htons(skb->local_port), dst_ip, skb->remote_port>>16, cgroup_id, side);
}

save_packet(skb, src_ip, src_port, dst_ip, dst_port, cgroup_id, side);

return 1;
}

Expand Down Expand Up @@ -211,6 +211,14 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx)
packet_id = pkt_id_ptr->id++;
bpf_spin_unlock(&pkt_id_ptr->lock);

// send initial chunk before the first packet
if (unlikely(packet_id == 0)) {
if (bpf_perf_event_output(skb, &pkts_buffer, BPF_F_CURRENT_CPU, p, 0))
{
log_error(skb, LOG_ERROR_PKT_SNIFFER, 11, 0l, 0l);
}
}

if (bpf_map_update_elem(&packet_context, &packet_id, p, BPF_NOEXIST))
{
log_error(skb, LOG_ERROR_PKT_SNIFFER, 5, 0l, 0l);
Expand Down
16 changes: 11 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var procfs = flag.String("procfs", "/proc", "The procfs directory, used when map
// development
var debug = flag.Bool("debug", false, "Enable debug mode")

var initBPF = flag.Bool("init-bpf", false, "Use to initialize bpf filesystem. Common usage is from init containers.")

var disableEbpfCapture = flag.Bool("disable-ebpf", false, "Disable capture packet via eBPF")
var disableTlsLog = flag.Bool("disable-tls-log", false, "Disable tls logging")

Expand Down Expand Up @@ -103,6 +105,10 @@ func main() {
}
}()

if *initBPF {
initBPFSubsystem()
return
}
run()
}

Expand All @@ -120,11 +126,11 @@ func run() {
return
}

tcpMap, err := resolver.GatherPidsTCPMap(*procfs, isCgroupsV2)
if err != nil {
log.Error().Err(err).Msg("tcp map lookup failed")
return
}
tcpMap, err := resolver.GatherPidsTCPMap(*procfs, isCgroupsV2)
if err != nil {
log.Error().Err(err).Msg("tcp map lookup failed")
return
}

tracer = &Tracer{
procfs: *procfs,
Expand Down
111 changes: 101 additions & 10 deletions pkg/bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bpf

import (
"fmt"
"path/filepath"
"strings"

"bytes"
"os"
Expand All @@ -11,11 +13,18 @@ import (
"github.com/cilium/ebpf/asm"
"github.com/cilium/ebpf/features"
"github.com/go-errors/errors"
"github.com/kubeshark/tracer/misc"
"github.com/kubeshark/tracer/pkg/utils"
"github.com/moby/moby/pkg/parsers/kernel"
"github.com/rs/zerolog/log"
)

const (
PinPath = "/sys/fs/bpf/kubeshark"
PinNamePlainPackets = "packets_plain"
PinNameTLSPackets = "packets_tls"
)

// TODO: cilium/ebpf does not support .kconfig Therefore; for now, we build object files per kernel version.

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go@v0.12.3 -target $BPF_TARGET -cflags $BPF_CFLAGS -type tls_chunk -type goid_offsets Tracer ../../bpf/tracer.c
Expand All @@ -27,7 +36,7 @@ type BpfObjectsImpl struct {
specs *ebpf.CollectionSpec
}

func (objs *BpfObjectsImpl) loadBpfObjects(bpfConstants map[string]uint64, reader *bytes.Reader) error {
func (objs *BpfObjectsImpl) loadBpfObjects(bpfConstants map[string]uint64, mapReplacements map[string]*ebpf.Map, reader *bytes.Reader) error {
var err error

objs.specs, err = ebpf.LoadCollectionSpecFromReader(reader)
Expand All @@ -44,7 +53,10 @@ func (objs *BpfObjectsImpl) loadBpfObjects(bpfConstants map[string]uint64, reade
return err
}

err = objs.specs.LoadAndAssign(objs.bpfObjs, nil)
opts := ebpf.CollectionOptions{
MapReplacements: mapReplacements,
}
err = objs.specs.LoadAndAssign(objs.bpfObjs, &opts)
if err != nil {
var ve *ebpf.VerifierError
if errors.As(err, &ve) {
Expand All @@ -59,8 +71,7 @@ func (objs *BpfObjectsImpl) loadBpfObjects(bpfConstants map[string]uint64, reade
}

type BpfObjects struct {
BpfObjs TracerObjects
IsCgroupV2 bool
BpfObjs TracerObjects
}

func programHelperExists(pt ebpf.ProgramType, helper asm.BuiltinFunc) uint64 {
Expand All @@ -82,15 +93,41 @@ func NewBpfObjects(disableEbpfCapture bool) (*BpfObjects, error) {
}

cgroupV1 := uint64(1)
objs.IsCgroupV2, err = utils.IsCgroupV2()
isCgroupV2, err := utils.IsCgroupV2()
if err != nil {
log.Error().Err(err).Msg("read cgroups information failed:")
}
if objs.IsCgroupV2 {
if isCgroupV2 {
cgroupV1 = 0
}

log.Info().Msg(fmt.Sprintf("Detected Linux kernel version: %s cgroups version2: %v", kernelVersion, objs.IsCgroupV2))
mapReplacements := make(map[string]*ebpf.Map)
plainPath := filepath.Join(PinPath, PinNamePlainPackets)
tlsPath := filepath.Join(PinPath, PinNameTLSPackets)

if !kernel.CheckKernelVersion(5, 4, 0) {
disableEbpfCapture = true
}

markDisabledEBPF := func() error {
pathNoEbpf := filepath.Join(misc.GetDataDir(), "noebpf")
file, err := os.Create(pathNoEbpf)
if err != nil {
return err
}
file.Close()
return nil
}

ebpfBackendStatus := "enabled"
if disableEbpfCapture {
ebpfBackendStatus = "disabled"
if err = markDisabledEBPF(); err != nil {
return nil, err
}
}

log.Info().Msg(fmt.Sprintf("Detected Linux kernel version: %s cgroups version2: %v, eBPF backend %v", kernelVersion, isCgroupV2, ebpfBackendStatus))
kernelVersionInt := uint64(1_000_000)*uint64(kernelVersion.Kernel) + uint64(1_000)*uint64(kernelVersion.Major) + uint64(kernelVersion.Minor)

// TODO: cilium/ebpf does not support .kconfig Therefore; for now, we load object files according to kernel version.
Expand Down Expand Up @@ -127,16 +164,70 @@ func NewBpfObjects(disableEbpfCapture bool) (*BpfObjects, error) {
"DISABLE_EBPF_CAPTURE": disableCapture,
}

err = objects.loadBpfObjects(bpfConsts, bytes.NewReader(_TracerBytes))
pktsBuffer, err := ebpf.LoadPinnedMap(plainPath, nil)
if err == nil {
objs.BpfObjs = *objects.bpfObjs.(*TracerObjects)
mapReplacements["pkts_buffer"] = pktsBuffer
log.Info().Str("path", tlsPath).Msg("loaded plain packets buffer")
} else if !errors.Is(err, os.ErrNotExist) {
log.Error().Msg(fmt.Sprintf("load plain packets map failed: %v", err))
}

if err != nil {
chunksBuffer, err := ebpf.LoadPinnedMap(tlsPath, nil)
if err == nil {
mapReplacements["chunks_buffer"] = chunksBuffer
log.Info().Str("path", tlsPath).Msg("loaded tls packets buffer")
} else if !errors.Is(err, os.ErrNotExist) {
log.Error().Msg(fmt.Sprintf("load tls packets map failed: %v", err))
}

err = objects.loadBpfObjects(bpfConsts, mapReplacements, bytes.NewReader(_TracerBytes))
if err == nil {
objs.BpfObjs = *objects.bpfObjs.(*TracerObjects)
} else if err != nil {
log.Error().Msg(fmt.Sprintf("load bpf objects failed: %v", err))
return nil, err
}
}

// Pin packet perf maps:

defer func() {
if os.IsPermission(err) || strings.Contains(fmt.Sprintf("%v", err), "permission") {
log.Warn().Msg(fmt.Sprintf("There are no enough permissions to activate eBPF. Error: %v", err))
if err = markDisabledEBPF(); err != nil {
log.Error().Err(err).Msg("disable ebpf failed")
} else {
err = nil
}
}
}()

if err = os.MkdirAll(PinPath, 0700); err != nil {
log.Error().Msg(fmt.Sprintf("mkdir pin path failed: %v", err))
return nil, err
}

pinMap := func(mapName, path string, mapObj *ebpf.Map) error {
if _, ok := mapReplacements[mapName]; !ok {
if err = mapObj.Pin(path); err != nil {
log.Error().Err(err).Str("path", path).Msg("pin perf buffer failed")
return err
} else {
log.Info().Str("path", path).Msg("pinned perf buffer")
}
}
return nil
}

if !disableEbpfCapture {
if err = pinMap("pkts_buffer", plainPath, objs.BpfObjs.PktsBuffer); err != nil {
return nil, err
}
}

if err = pinMap("chunks_buffer", tlsPath, objs.BpfObjs.ChunksBuffer); err != nil {
return nil, err
}

return &objs, nil
}
63 changes: 48 additions & 15 deletions pkg/bpf/tls_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
"github.com/go-errors/errors"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/kubeshark/gopacket"
"github.com/kubeshark/tracer/misc"
"github.com/kubeshark/tracer/pkg/utils"
"github.com/rs/zerolog/log"
Expand All @@ -20,24 +23,32 @@ const (
fdCacheMaxItems = 500000 / fdCachedItemAvgSize
)

type RawWriter func(timestamp uint64, cgroupId uint64, direction uint8, firstLayerType gopacket.LayerType, l ...gopacket.SerializableLayer) (err error)
type GopacketWriter func(packet gopacket.Packet) (err error)

type TlsPoller struct {
streams map[string]*TlsStream
closeStreams chan string
chunksReader *perf.Reader
fdCache *simplelru.LRU // Actual type is map[string]addressPair
evictedCounter int
Sorter *PacketSorter
streams map[string]*TlsStream
closeStreams chan string
chunksReader *perf.Reader
fdCache *simplelru.LRU // Actual type is map[string]addressPair
evictedCounter int
rawWriter RawWriter
gopacketWriter GopacketWriter
receivedPackets uint64
lostChunks uint64
}

func NewTlsPoller(
bpfObjs *BpfObjects,
sorter *PacketSorter,
perfBuffer *ebpf.Map,
rawWriter RawWriter,
gopacketWriter GopacketWriter,
) (*TlsPoller, error) {
poller := &TlsPoller{
streams: make(map[string]*TlsStream),
closeStreams: make(chan string, misc.TlsCloseChannelBufferSize),
chunksReader: nil,
Sorter: sorter,
streams: make(map[string]*TlsStream),
closeStreams: make(chan string, misc.TlsCloseChannelBufferSize),
chunksReader: nil,
rawWriter: rawWriter,
gopacketWriter: gopacketWriter,
}

fdCache, err := simplelru.NewLRU(fdCacheMaxItems, poller.fdCacheEvictCallback)
Expand All @@ -46,7 +57,7 @@ func NewTlsPoller(
}
poller.fdCache = fdCache

poller.chunksReader, err = perf.NewReader(bpfObjs.BpfObjs.ChunksBuffer, os.Getpagesize()*10000)
poller.chunksReader, err = perf.NewReader(perfBuffer, os.Getpagesize()*10000)

if err != nil {
return nil, errors.Wrap(err, 0)
Expand Down Expand Up @@ -84,9 +95,30 @@ func (p *TlsPoller) Start() {
}()
}

func (p *TlsPoller) GetLostChunks() uint64 {
return p.lostChunks
}

func (p *TlsPoller) GetReceivedPackets() uint64 {
return p.receivedPackets
}

func (p *TlsPoller) pollChunksPerfBuffer(chunks chan<- *TracerTlsChunk) {
log.Info().Msg("Start polling for tls events")

p.chunksReader.SetDeadline(time.Unix(1, 0))
var emptyRecord perf.Record
for {
err := p.chunksReader.ReadInto(&emptyRecord)
if errors.Is(err, os.ErrDeadlineExceeded) {
break
} else if err != nil {
log.Error().Err(err).Msg("Error reading chunks from pkts perf, aborting!")
return
}
}
p.chunksReader.SetDeadline(time.Time{})

for {
record, err := p.chunksReader.Read()

Expand All @@ -97,12 +129,13 @@ func (p *TlsPoller) pollChunksPerfBuffer(chunks chan<- *TracerTlsChunk) {
return
}

utils.LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
log.Error().Err(err).Msg("Error reading chunks from pkts perf, aborting!")
return
}

if record.LostSamples != 0 {
log.Info().Msg(fmt.Sprintf("Buffer is full, dropped %d chunks", record.LostSamples))
p.lostChunks += record.LostSamples
continue
}

Expand All @@ -111,7 +144,7 @@ func (p *TlsPoller) pollChunksPerfBuffer(chunks chan<- *TracerTlsChunk) {
var chunk TracerTlsChunk

if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
utils.LogError(errors.Errorf("Error parsing chunk %v", err))
log.Error().Err(err).Msg("Error parsing chunk")
continue
}

Expand Down
Loading

0 comments on commit bb40525

Please sign in to comment.