Skip to content

Commit

Permalink
Add DL agent
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed May 5, 2024
1 parent cc8f6bb commit 6e3b0bb
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 2 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ SERVICE := $(PROJECT).server
.PHONY: reset-db setup-local server server-profile install-js
.PHONY: client-update client-large-update client-get client-rebuild client-rebuild-with-cache
.PHONY: client-getcache client-gc-contents client-gc-project client-gc-random-projects
.PHONY: start-agent
.PHONY: health upload-container-image run-container gen-docs
.PHONY: load-test-new load-test-get load-test-update

Expand Down Expand Up @@ -189,6 +190,9 @@ client-gc-random-projects: export DL_SKIP_SSL_VERIFICATION=1
client-gc-random-projects:
go run cmd/client/main.go gc --host $(GRPC_HOST) --mode random-projects --sample 25 --keep 1

start-agent:
go run cmd/client/main.go agent --dir /tmp/dl_agent

health:
grpc-health-probe -addr $(GRPC_SERVER)
grpc-health-probe -addr $(GRPC_SERVER) -service $(SERVICE)
Expand Down
4 changes: 2 additions & 2 deletions internal/files/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func writeObject(rootDir string, cacheObjectsDir string, reader *db.TarReader, h
return err
}
hashHex := hex.EncodeToString(content)
return hardlinkDir(filepath.Join(cacheObjectsDir, hashHex, header.Name), path)
return HardlinkDir(filepath.Join(cacheObjectsDir, hashHex, header.Name), path)

case tar.TypeReg:
dir := filepath.Dir(path)
Expand Down Expand Up @@ -193,7 +193,7 @@ func makeSymlink(oldname, newname string) error {
return nil
}

func hardlinkDir(olddir, newdir string) error {
func HardlinkDir(olddir, newdir string) error {
if fileExists(newdir) {
err := os.RemoveAll(newdir)
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package agent

import (
"context"
"encoding/json"
"net"
"net/http"
"path/filepath"
"strconv"

"github.com/gadget-inc/dateilager/internal/files"
"github.com/gadget-inc/dateilager/internal/logger"
"go.uber.org/zap"
)

type Agent struct {
cacheDir string
port int
}

func NewAgent(cacheDir string, port int) Agent {
return Agent{cacheDir, port}
}

func (a *Agent) Server(ctx context.Context) *http.Server {
http.HandleFunc("GET /healthz", a.healthCheck)
http.HandleFunc("POST /link_cache", a.linkCache)

server := &http.Server{
Addr: ":" + strconv.Itoa(a.port),
BaseContext: func(net.Listener) context.Context { return ctx },
}
return server
}

type healthStatus struct {
Status string `json:"status"`
}

func (a *Agent) healthCheck(resp http.ResponseWriter, req *http.Request) {
err := json.NewEncoder(resp).Encode(healthStatus{Status: "OK"})
if err != nil {
httpErr(req.Context(), resp, err, "failed to encode status")
return
}
resp.WriteHeader(http.StatusOK)
}

type linkRequest struct {
Dir string `json:"dir"`
}

func (a *Agent) linkCache(resp http.ResponseWriter, req *http.Request) {
linkReq := linkRequest{}
err := json.NewDecoder(req.Body).Decode(&linkReq)
if err != nil {
httpReqErr(req.Context(), resp, err, "failed to decode link request")
return
}

err = files.HardlinkDir(a.cacheDir, filepath.Join(linkReq.Dir, "cache"))
if err != nil {
httpErr(req.Context(), resp, err, "failed to link cache director")
return
}

resp.WriteHeader(http.StatusCreated)
}

func httpReqErr(ctx context.Context, resp http.ResponseWriter, err error, message string) {
logger.Warn(ctx, message, zap.Error(err))
http.Error(resp, err.Error(), http.StatusBadRequest)
}

func httpErr(ctx context.Context, resp http.ResponseWriter, err error, message string) {
logger.Error(ctx, message, zap.Error(err))
http.Error(resp, err.Error(), http.StatusInternalServerError)
}
67 changes: 67 additions & 0 deletions pkg/cli/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cli

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/pkg/agent"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

func NewCmdAgent() *cobra.Command {
var (
dir string
port int
)

cmd := &cobra.Command{
Use: "agent",
RunE: func(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
c := client.FromContext(ctx)

version, err := c.GetCache(ctx, dir)
if err != nil {
return err
}

logger.Info(ctx, "cache downloaded", key.Version.Field(version), key.Directory.Field(dir))

a := agent.NewAgent(dir, port)

backgroundCtx, cancel := context.WithCancel(ctx)
server := a.Server(backgroundCtx)

osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM)

go func() {
<-osSignals
logger.Info(ctx, "received interrupt signal")

cancel()

err := server.Shutdown(ctx)
if err != nil {
logger.Error(ctx, "error shutting down server", zap.Error(err))
}
}()

logger.Info(ctx, "start agent", zap.Int("port", port), key.Directory.Field(dir))
return server.ListenAndServe()
},
}

cmd.Flags().StringVar(&dir, "dir", "", "Cache directory")
cmd.Flags().IntVar(&port, "port", 8080, "API server port")

_ = cmd.MarkFlagRequired("path")

return cmd
}
1 change: 1 addition & 0 deletions pkg/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func NewClientCommand() *cobra.Command {
cmd.AddCommand(NewCmdUpdate())
cmd.AddCommand(NewCmdGc())
cmd.AddCommand(NewCmdGetCache())
cmd.AddCommand(NewCmdAgent())

return cmd
}
Expand Down

0 comments on commit 6e3b0bb

Please sign in to comment.