Skip to content

Commit

Permalink
Cached UDS only
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Jul 16, 2024
1 parent 5533319 commit cebed78
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 135 deletions.
36 changes: 20 additions & 16 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ DB_URI := postgres://$(DB_USER):$(DB_PASS)@$(DB_HOST):5432/dl

GRPC_HOST ?= localhost
GRPC_PORT ?= 5051
GRPC_CACHED_PORT ?= 5053

CACHED_SOCKET ?= unix:///tmp/csi.sock

DEV_TOKEN_ADMIN ?= v2.public.eyJzdWIiOiJhZG1pbiJ9yt40HNkcyOUtDeFa_WPS6vi0WiE4zWngDGJLh17TuYvssTudCbOdQEkVDRD-mSNTXLgSRDXUkO-AaEr4ZLO4BQ
DEV_TOKEN_PROJECT_1 ?= v2.public.eyJzdWIiOiIxIn2jV7FOdEXafKDtAnVyDgI4fmIbqU7C1iuhKiL0lDnG1Z5-j6_ObNDd75sZvLZ159-X98_mP4qvwzui0w8pjt8F
Expand All @@ -24,12 +25,13 @@ MIGRATE_DIR := ./migrations
SERVICE := $(PROJECT).server

.PHONY: migrate migrate-create clean build lint release
.PHONY: test test-one test-fuzz test-js lint-js build-js
.PHONY: reset-db setup-local server server-profile install-js
.PHONY: test test-one test-fuzz test-js lint-js install-js build-js
.PHONY: reset-db setup-local build-cache-version server server-profile cached
.PHONY: client-update client-large-update client-get client-rebuild client-rebuild-with-cache
.PHONY: client-getcache client-gc-contents client-gc-project client-gc-random-projects
.PHONY: health upload-container-image run-container gen-docs
.PHONY: load-test-new load-test-get load-test-update
.PHONY: cachedclient-probe cachedclient-populate cachedclient-stats
.PHONY: health upload-container-image upload-prerelease-container-image run-container gen-docs
.PHONY: load-test-new load-test-update load-test-update-large load-test-get load-test-get-compress

migrate:
migrate -database $(DB_URI)?sslmode=disable -path $(MIGRATE_DIR) up
Expand Down Expand Up @@ -114,6 +116,9 @@ reset-db: migrate
setup-local: reset-db
psql $(DB_URI) -c "insert into dl.projects (id, latest_version, pack_patterns) values (1, 0, '{\"node_modules/.*/\"}');"

build-cache-version:
psql $(DB_URI) -c "with impactful_packed_objects as (select hash, count(*) as count from dl.objects where packed = true and stop_version is null group by hash order by count desc limit 20) insert into dl.cache_versions (hashes) select coalesce(array_agg(hash), '{}') from impactful_packed_objects;"

server: export DL_ENV=dev
server: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go
go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT)
Expand All @@ -125,12 +130,7 @@ server-profile: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go
cached: export DL_ENV=dev
cached: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN)
cached: internal/pb/cache.pb.go internal/pb/cache_grpc.pb.go
go run cmd/cached/main.go --upstream-host $(GRPC_HOST) --upstream-port $(GRPC_PORT) --port $(GRPC_CACHED_PORT) --staging-path tmp/cache-stage

cached-csi: export DL_ENV=dev
cached-csi: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN)
cached-csi: internal/pb/cache.pb.go internal/pb/cache_grpc.pb.go
go run cmd/cached/main.go --upstream-host $(GRPC_HOST) --upstream-port $(GRPC_PORT) --staging-path tmp/cache-stage --csi-socket unix://tmp/csi.sock
go run cmd/cached/main.go --upstream-host $(GRPC_HOST) --upstream-port $(GRPC_PORT) --csi-socket $(CACHED_SOCKET) --staging-path tmp/cache-stage

client-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-update: export DL_SKIP_SSL_VERIFICATION=1
Expand Down Expand Up @@ -180,11 +180,6 @@ client-getcache: export DL_SKIP_SSL_VERIFICATION=1
client-getcache:
go run cmd/client/main.go getcache --host $(GRPC_HOST) --path input/cache

client-getcached: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-getcached: export DL_SKIP_SSL_VERIFICATION=1
client-getcached:
go run cmd/client/main.go getcached --host $(GRPC_HOST) --port $(GRPC_CACHED_PORT) --path input/cache

client-gc-contents: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-gc-contents: export DL_SKIP_SSL_VERIFICATION=1
client-gc-contents:
Expand All @@ -200,6 +195,15 @@ client-gc-random-projects: export DL_SKIP_SSL_VERIFICATION=1
client-gc-random-projects:
go run cmd/client/main.go gc --host $(GRPC_HOST) --mode random-projects --sample 25 --keep 1

cachedclient-probe:
go run cmd/cached-client/main.go probe --socket $(CACHED_SOCKET)

cachedclient-populate:
go run cmd/cached-client/main.go populate --socket $(CACHED_SOCKET) --path input/cache

cachedclient-stats:
go run cmd/cached-client/main.go stats --socket $(CACHED_SOCKET)

health:
grpc-health-probe -addr $(GRPC_SERVER)
grpc-health-probe -addr $(GRPC_SERVER) -service $(SERVICE)
Expand Down
7 changes: 7 additions & 0 deletions cmd/cached-client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "github.com/gadget-inc/dateilager/pkg/cachedcli"

func main() {
cachedcli.ClientExecute()
}
4 changes: 2 additions & 2 deletions cmd/cached/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import "github.com/gadget-inc/dateilager/pkg/cli"
import "github.com/gadget-inc/dateilager/pkg/cachedcli"

func main() {
cli.CacheDaemonExecute()
cachedcli.CacheDaemonExecute()
}
2 changes: 1 addition & 1 deletion cmd/fuzz-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,5 +682,5 @@ func main() {
}

logger.Info(ctx, "fuzz test completed")
_ = logger.Sync()
_ = logger.Sync(ctx)
}
1 change: 1 addition & 0 deletions internal/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
QueryPath = StringKey("dl.query.path")
SampleRate = Float32Key("dl.sample_rate")
Server = StringKey("dl.server")
Socket = StringKey("dl.socket")
State = StringKey("dl.state")
Template = Int64pKey("dl.template")
ToVersion = Int64pKey("dl.to_version")
Expand Down
4 changes: 2 additions & 2 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func Logger(ctx context.Context) *zap.Logger {
return zap.L()
}

func Sync() error {
return zap.L().Sync()
func Sync(ctx context.Context) error {
return Logger(ctx).Sync()
}

func Debug(ctx context.Context, msg string, fields ...zap.Field) {
Expand Down
5 changes: 0 additions & 5 deletions pkg/api/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCach
return nil, status.Errorf(codes.Unimplemented, "Cached populateDiskCache only implemented in dev and test environments")
}

err := requireAdminAuth(ctx)
if err != nil {
return nil, err
}

destination := req.Path

version, err := c.writeCache(destination)
Expand Down
40 changes: 6 additions & 34 deletions pkg/cached/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package cached

import (
"context"
"crypto/ed25519"
"crypto/tls"
"fmt"
"net"
"net/url"
Expand All @@ -12,54 +10,32 @@ import (
"path/filepath"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/gadget-inc/dateilager/internal/auth"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/pb"
"github.com/gadget-inc/dateilager/pkg/api"
"github.com/gadget-inc/dateilager/pkg/server"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

type CachedServer struct {
Grpc *grpc.Server
Health *health.Server
Grpc *grpc.Server
}

func NewServer(ctx context.Context, cert *tls.Certificate, pasetoKey ed25519.PublicKey) *CachedServer {
creds := credentials.NewServerTLSFromCert(cert)
validator := auth.NewAuthValidator(pasetoKey)

func NewServer(ctx context.Context) *CachedServer {
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
grpc_recovery.UnaryServerInterceptor(),
otelgrpc.UnaryServerInterceptor(),
logger.UnaryServerInterceptor(),
server.ValidateTokenUnary(validator),
),
),
grpc.ReadBufferSize(server.BUFFER_SIZE),
grpc.WriteBufferSize(server.BUFFER_SIZE),
grpc.InitialConnWindowSize(server.INITIAL_CONN_WINDOW_SIZE),
grpc.InitialWindowSize(server.INITIAL_WINDOW_SIZE),
grpc.MaxRecvMsgSize(server.MAX_MESSAGE_SIZE),
grpc.MaxSendMsgSize(server.MAX_MESSAGE_SIZE),
grpc.Creds(creds),
)

logger.Info(ctx, "register HealthServer")
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)

server := &CachedServer{
Grpc: grpcServer,
Health: healthServer,
Grpc: grpcServer,
}

return server
Expand All @@ -74,14 +50,10 @@ func (s *CachedServer) RegisterCSI(cached *api.Cached) {
csi.RegisterNodeServer(s.Grpc, cached)
}

func (s *CachedServer) Serve(lis net.Listener) error {
return s.Grpc.Serve(lis)
}

func (s *CachedServer) ServeCSI(listenSocketPath string) error {
u, err := url.Parse(listenSocketPath)
func (s *CachedServer) Serve(socketPath string) error {
u, err := url.Parse(socketPath)
if err != nil {
return fmt.Errorf("unable to parse address: %q", err)
return fmt.Errorf("unable to parse socket address: %q", err)
}

addr := path.Join(u.Host, filepath.FromSlash(u.Path))
Expand Down
145 changes: 145 additions & 0 deletions pkg/cachedcli/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package cachedcli

import (
"context"
"encoding/json"
"flag"
"fmt"
"strings"
"time"

"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/telemetry"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/gadget-inc/dateilager/pkg/version"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
shutdownTelemetry func()
span trace.Span
)

func NewCachedClientCommand() *cobra.Command {
var (
level *zapcore.Level
encoding string
tracing bool
otelContext string
socket string
timeout uint
)

var cancel context.CancelFunc

cmd := &cobra.Command{
Use: "cachedclient",
Short: "DateiLager cached client",
DisableAutoGenTag: true,
Version: version.Version,
SilenceErrors: true,
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
cmd.SilenceUsage = true // silence usage when an error occurs after flags have been parsed

config := zap.NewProductionConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
config.Level = zap.NewAtomicLevelAt(*level)
config.Encoding = encoding

err := logger.Init(config)
if err != nil {
return fmt.Errorf("could not initialize logger: %w", err)
}

ctx := cmd.Context()

if timeout != 0 {
ctx, cancel = context.WithTimeout(cmd.Context(), time.Duration(timeout)*time.Millisecond)
}

if tracing {
shutdownTelemetry = telemetry.Init(ctx, telemetry.Client)
}

if otelContext != "" {
var mapCarrier propagation.MapCarrier
err := json.NewDecoder(strings.NewReader(otelContext)).Decode(&mapCarrier)
if err != nil {
return fmt.Errorf("failed to decode otel-context: %w", err)
}

ctx = otel.GetTextMapPropagator().Extract(ctx, mapCarrier)
}

ctx, span = telemetry.Start(ctx, "cached-cmd.main")

if socket == "" {
return fmt.Errorf("required flag(s) \"socket\" not set")
}

cl, err := client.NewCachedUnixClient(ctx, socket)
if err != nil {
return err
}
ctx = client.CachedIntoContext(ctx, cl)

cmd.SetContext(ctx)

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, _ []string) error {
if cancel != nil {
cancel()
}
return nil
},
}

flags := cmd.PersistentFlags()

level = zap.LevelFlag("log-level", zap.DebugLevel, "Log level")
flags.AddGoFlag(flag.CommandLine.Lookup("log-level"))
flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)")
flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled")
flags.StringVar(&otelContext, "otel-context", "", "Open Telemetry context")

flags.StringVar(&socket, "socket", "", "Unix domain socket path")
flags.UintVar(&timeout, "timeout", 0, "GRPC client timeout (ms)")

_ = cmd.MarkFlagRequired("socket")

cmd.AddCommand(NewCmdPopulate())
cmd.AddCommand(NewCmdProbe())

return cmd
}

func ClientExecute() {
ctx := context.Background()
cmd := NewCachedClientCommand()
err := cmd.ExecuteContext(ctx)

client := client.FromContext(cmd.Context())
if client != nil {
client.Close()
}

if span != nil {
span.End()
}

if shutdownTelemetry != nil {
shutdownTelemetry()
}

_ = logger.Sync(ctx)

if err != nil {
logger.Fatal(ctx, "cached client failed", zap.Error(err))
}
}
6 changes: 3 additions & 3 deletions pkg/cli/getcached.go → pkg/cachedcli/populate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cli
package cachedcli

import (
"github.com/gadget-inc/dateilager/internal/key"
Expand All @@ -7,13 +7,13 @@ import (
"github.com/spf13/cobra"
)

func NewCmdGetCacheFromDaemon() *cobra.Command {
func NewCmdPopulate() *cobra.Command {
var (
path string
)

cmd := &cobra.Command{
Use: "getcached",
Use: "populate",
RunE: func(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
c := client.CachedFromContext(ctx)
Expand Down
Loading

0 comments on commit cebed78

Please sign in to comment.