Skip to content

Commit

Permalink
Merge pull request #97 from gadget-inc/various_fixes
Browse files Browse the repository at this point in the history
Various fixes
  • Loading branch information
angelini authored Jul 31, 2024
2 parents c8ba2b5 + f4ac454 commit 8ea7149
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ server: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go

server-profile: export DL_ENV=dev
server-profile: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go
go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --log-level info
go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --memprofile mem.pb.gz --log-level info

cached: export DL_ENV=dev
cached: export DL_TOKEN=$(DEV_SHARED_READER_TOKEN)
Expand Down
10 changes: 10 additions & 0 deletions internal/db/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m
contents[hash] = value
}
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to iterate rows: %w", err)
}
}

return contents, nil
Expand Down Expand Up @@ -262,5 +267,10 @@ func RandomContents(ctx context.Context, conn DbConnector, sample float32) ([]Ha
hashes = append(hashes, hash)
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to iterate rows: %w", err)
}

return hashes, nil
}
5 changes: 5 additions & 0 deletions internal/db/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func GcProjectObjects(ctx context.Context, conn DbConnector, project int64, keep
hashes = append(hashes, hash)
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to iterate rows: %w", err)
}

return hashes, nil
}

Expand Down
10 changes: 10 additions & 0 deletions internal/db/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func ListProjects(ctx context.Context, tx pgx.Tx) ([]*pb.Project, error) {
projects = append(projects, &pb.Project{Id: id, Version: version})
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to iterate rows: %w", err)
}

return projects, nil
}

Expand Down Expand Up @@ -96,6 +101,11 @@ func RandomProjects(ctx context.Context, conn DbConnector, sample float32) ([]in
projects = append(projects, project)
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to iterate rows: %w", err)
}

// With only few projects this sometimes returns no results.
if len(projects) > 0 {
break
Expand Down
14 changes: 10 additions & 4 deletions internal/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,8 @@ func executeQuery(ctx context.Context, tx pgx.Tx, queryBuilder *queryBuilder) ([
defer rows.Close()

var dbObjects []DbObject
for {
if !rows.Next() {
break
}

for rows.Next() {
var object DbObject

err := rows.Scan(&object.path, &object.mode, &object.size, &object.cached, &object.packed, &object.deleted, &object.hash.H1, &object.hash.H2)
Expand All @@ -136,6 +133,11 @@ func executeQuery(ctx context.Context, tx pgx.Tx, queryBuilder *queryBuilder) ([
dbObjects = append(dbObjects, object)
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to iterate rows: %w", err)
}

return dbObjects, nil
}

Expand Down Expand Up @@ -351,6 +353,10 @@ func GetCacheTars(ctx context.Context, tx pgx.Tx) (cacheTarStream, CloseFunc, er

return func() (int64, []byte, *Hash, error) {
if !rows.Next() {
err := rows.Err()
if err != nil {
return 0, nil, nil, fmt.Errorf("failed to iterate rows: %w", err)
}
return 0, nil, nil, io.EOF
}

Expand Down
5 changes: 5 additions & 0 deletions internal/db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje
}
rows.Close()

err = rows.Err()
if err != nil {
return false, fmt.Errorf("failed to iterate rows: %w", err)
}

shouldInsert := true
updated, err := updateObjects(content, updates)
if errors.Is(err, ErrEmptyPack) {
Expand Down
38 changes: 28 additions & 10 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"syscall"

Expand All @@ -34,15 +35,16 @@ func NewServerCommand() *cobra.Command {
)

var (
level *zapcore.Level
encoding string
tracing bool
profilePath string
port int
dbUri string
certFile string
keyFile string
pasetoFile string
level *zapcore.Level
encoding string
tracing bool
profilePath string
memProfilePath string
port int
dbUri string
certFile string
keyFile string
pasetoFile string
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -128,6 +130,21 @@ func NewServerCommand() *cobra.Command {
signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM)
go func() {
<-osSignals

if memProfilePath != "" {
memProfile, err := os.Create(memProfilePath)
if err != nil {
logger.Error(ctx, "cannot create heap profile file", zap.Error(err), zap.String("path", memProfilePath))
}
defer memProfile.Close()
runtime.GC()

err = pprof.WriteHeapProfile(memProfile)
if err != nil {
logger.Error(ctx, "cannot write heap profile", zap.Error(err), zap.String("path", memProfilePath))
}
}

s.Grpc.GracefulStop()
}()

Expand All @@ -153,7 +170,8 @@ func NewServerCommand() *cobra.Command {
flags.AddGoFlag(flag.CommandLine.Lookup("log-level"))
flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)")
flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled")
flags.StringVar(&profilePath, "profile", "", "CPU profile output path (profiling enabled if set)")
flags.StringVar(&profilePath, "profile", "", "CPU profile output path (CPU profiling enabled if set)")
flags.StringVar(&memProfilePath, "memprofile", "", "Memory profile output path")

flags.IntVar(&port, "port", 5051, "GRPC server port")
flags.StringVar(&dbUri, "dburi", "postgres://postgres@127.0.0.1:5432/dl", "Postgres URI")
Expand Down
11 changes: 7 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ func (d *DbPoolConnector) Connect(ctx context.Context) (pgx.Tx, db.CloseFunc, er
return nil, nil, err
}

typeMap := conn.Conn().TypeMap()
for _, extraType := range d.extraTypes {
conn.Conn().TypeMap().RegisterType(extraType)
_, found := typeMap.TypeForOID(extraType.OID)
if !found {
typeMap.RegisterType(extraType)
}
}

tx, err := conn.Begin(ctx)
Expand Down Expand Up @@ -177,15 +181,14 @@ func NewServer(ctx context.Context, dbConn *DbPoolConnector, cert *tls.Certifica
}

func (s *Server) monitorDbPool(ctx context.Context, dbConn *DbPoolConnector) {
ticker := time.NewTicker(time.Second)

go func() {
for {
timer := time.NewTimer(time.Second)
select {
case <-ctx.Done():
s.Health.SetServingStatus("dateilager.server", healthpb.HealthCheckResponse_NOT_SERVING)
return
case <-ticker.C:
case <-timer.C:
ctxTimeout, cancel := context.WithTimeout(ctx, 800*time.Millisecond)

status := healthpb.HealthCheckResponse_SERVING
Expand Down
1 change: 1 addition & 0 deletions test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func latestCacheVersionHashes(tc util.TestCtx) (int64, []db.Hash) {

hashes = append(hashes, hash)
}
require.NoError(tc.T(), rows.Err(), "iterate rows")

return version, hashes
}
Expand Down
2 changes: 2 additions & 0 deletions test/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ func debugProjects(tc util.TestCtx) {

fmt.Printf("%d,\t%d,\t\t%v\n", id, latestVersion, packPatterns)
}
require.NoError(tc.T(), rows.Err(), "iterate rows")

fmt.Println()
}
Expand Down Expand Up @@ -755,6 +756,7 @@ func debugObjects(tc util.TestCtx) {

fmt.Printf("%d,\t\t%d,\t\t%s,\t\t%s,\t%s,\t%d,\t%v,\t(%x, %x)\n", project, start_version, formatPtr(stop_version), path, formatMode(mode), size, packed, h1, h2)
}
require.NoError(tc.T(), rows.Err(), "iterate rows")

fmt.Println()
}
Expand Down

0 comments on commit 8ea7149

Please sign in to comment.