Skip to content

Commit

Permalink
Fix hanging server closes
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Jul 16, 2024
1 parent 0657bd7 commit cb2c53a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
24 changes: 15 additions & 9 deletions pkg/cachedcli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cachedcli
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
Expand Down Expand Up @@ -109,6 +110,15 @@ func NewCacheDaemonCommand() *cobra.Command {
logger.Info(ctx, "register CSI")
s.RegisterCSI(cached)

mux := http.NewServeMux()
mux.HandleFunc("/healthz", healthzHandler)

healthServer := &http.Server{
Addr: fmt.Sprintf(":%d", healthzPort),
Handler: mux,
BaseContext: func(l net.Listener) context.Context { return ctx },
}

err = cached.Prepare(ctx)
if err != nil {
return fmt.Errorf("failed to prepare cache daemon in %s: %w", stagingPath, err)
Expand All @@ -121,7 +131,7 @@ func NewCacheDaemonCommand() *cobra.Command {
group.Go(func() error {
<-osSignals
s.Grpc.GracefulStop()
return nil
return healthServer.Shutdown(ctx)
})

group.Go(func() error {
Expand All @@ -130,15 +140,11 @@ func NewCacheDaemonCommand() *cobra.Command {
})

group.Go(func() error {
mux := http.NewServeMux()
mux.HandleFunc("/healthz", healthzHandler)

healthServer := &http.Server{
Addr: fmt.Sprintf(":%d", healthzPort),
Handler: mux,
BaseContext: func(l net.Listener) context.Context { return ctx },
err := healthServer.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return healthServer.ListenAndServe()
return err
})

return group.Wait()
Expand Down
10 changes: 3 additions & 7 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,6 @@ func (c *Client) GetCache(ctx context.Context, cacheRootDir string) (int64, uint
}
defer cleanupCacheLockFile(lockFile)

availableVersions := ReadCacheVersionFile(cacheRootDir)

ctx, span := telemetry.Start(ctx, "client.get_cache")
defer span.End()

Expand All @@ -783,11 +781,9 @@ func (c *Client) GetCache(ctx context.Context, cacheRootDir string) (int64, uint
}
version := response.Version

for _, availableVersion := range availableVersions {
if version == availableVersion {
return version, 0, nil
}
}
// We cannot early exist here, even if `version` is already available locally
// since we've opened the GRPC stream we need to go ahead and read the whole thing
// this should be split into 2 requests, one to get the latest version and another to download it.

tarChan := make(chan *pb.GetCacheResponse, 16)
group, ctx := errgroup.WithContext(ctx)
Expand Down

0 comments on commit cb2c53a

Please sign in to comment.