Skip to content

Commit

Permalink
feat(metricsharding): truncate merged blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Oct 4, 2023
1 parent 54d96a5 commit cc247c9
Showing 1 changed file with 73 additions and 23 deletions.
96 changes: 73 additions & 23 deletions internal/metricsharding/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,17 @@ func (s *Sharder) archiveAttributes(ctx context.Context,
return errors.Wrapf(err, "create static table %q", targetPath)
}

lg := zctx.From(ctx)

blocks, err := s.getBlocks(ctx, activePath, start, end)
if err != nil {
return errors.Wrap(err, "get attribute blocks to merge")
}
if len(blocks) == 0 {
// No blocks to merge.
lg.Info("No blocks to merge", zap.Stringer("to", targetPath))
return nil
}

opSpec := spec.Merge()
opSpec.MergeMode = "ordered"

Check failure on line 83 in internal/metricsharding/archive.go

View workflow job for this annotation

GitHub Actions / lint / run

string `ordered` has 3 occurrences, make it a constant (goconst)
Expand All @@ -79,12 +86,11 @@ func (s *Sharder) archiveAttributes(ctx context.Context,
}
opSpec.OutputTablePath = targetPath

lg := zctx.From(ctx)
op, err := s.mapreduce.Merge(opSpec)
if err != nil {
return errors.Wrap(err, "run merge operation")
}
lg.Info("Run merge operation",
lg.Info("Run attribute merge operation",
zap.Stringer("id", op.ID()),
zap.Stringer("from", activePath),
zap.Stringer("to", targetPath),
Expand All @@ -95,9 +101,23 @@ func (s *Sharder) archiveAttributes(ctx context.Context,
}
lg.Info("Merge operation done", zap.Stringer("id", op.ID()))

if err := batchRemove(ctx, s.yc, opSpec.InputTablePaths...); err != nil {
return errors.Wrap(err, "remove closed block attributes")
}
return nil
}

func batchRemove[T ypath.YPath](ctx context.Context, yc yt.CypressClient, paths ...T) error {
var grp errgroup.Group
for _, p := range paths {
p := p
grp.Go(func() error {
return yc.RemoveNode(ctx, p, nil)
})
}
return grp.Wait()
}

func (s *Sharder) archivePoints(ctx context.Context,
tenantPath ypath.Path,
start, end time.Time,
Expand All @@ -106,6 +126,8 @@ func (s *Sharder) archivePoints(ctx context.Context,
var (
activePath = tenantPath.Child("active").Child(table)
targetPath = tenantPath.Child("closed").Child(start.Format(timeBlockLayout)).Child(table)

lg = zctx.From(ctx)
)

if _, err := yt.CreateTable(ctx, s.yc, targetPath,
Expand All @@ -116,30 +138,58 @@ func (s *Sharder) archivePoints(ctx context.Context,
return errors.Wrapf(err, "create static table %q", targetPath)
}

opSpec := spec.Merge()
opSpec.MergeMode = "ordered"
opSpec.InputTablePaths = []ypath.YPath{activePath}
opSpec.OutputTablePath = targetPath
opSpec.InputQuery = fmt.Sprintf(
"* WHERE timestamp >= %d AND timestamp < %d",
start.UnixNano(), end.UnixNano(),
)

lg := zctx.From(ctx)
op, err := s.mapreduce.Merge(opSpec)
if err != nil {
return errors.Wrap(err, "run merge operation")
{
mergeSpec := spec.Merge()
mergeSpec.MergeMode = "ordered"
mergeSpec.InputTablePaths = []ypath.YPath{activePath}
mergeSpec.OutputTablePath = targetPath
mergeSpec.InputQuery = fmt.Sprintf(
"* WHERE timestamp >= %d AND timestamp < %d",
start.UnixNano(), end.UnixNano(),
)

op, err := s.mapreduce.Merge(mergeSpec)
if err != nil {
return errors.Wrap(err, "run merge operation")
}
lg.Info("Run merge operation",
zap.Stringer("id", op.ID()),
zap.Stringer("from", activePath),
zap.Stringer("to", targetPath),
)

if err := op.Wait(); err != nil {
return errors.Wrapf(err, "wait operation %q", op.ID())
}
lg.Info("Merge operation done", zap.Stringer("id", op.ID()))
}
lg.Info("Run merge operation",
zap.Stringer("id", op.ID()),
zap.Stringer("from", activePath),
zap.Stringer("to", targetPath),
)

if err := op.Wait(); err != nil {
return errors.Wrapf(err, "wait operation %q", op.ID())
{
// See https://ytsaurus.tech/docs/en/user-guide/dynamic-tables/bulk-insert#delete-where-via-input-query.
truncateSpec := spec.Merge()
truncateSpec.MergeMode = "ordered"
truncateSpec.InputTablePaths = []ypath.YPath{
// Set attributes as guide says.
`<append=%true; schema_modification="unversioned_update">` + activePath,
}
truncateSpec.OutputTablePath = activePath
// Truncate all rows older than current block start.
truncateSpec.InputQuery = fmt.Sprintf("* WHERE timestamp >= %d", end.UnixNano())

op, err := s.mapreduce.Merge(truncateSpec)
if err != nil {
return errors.Wrap(err, "run truncate operation")
}
lg.Info("Run truncate operation",
zap.Stringer("id", op.ID()),
zap.Stringer("table", activePath),
)

if err := op.Wait(); err != nil {
return errors.Wrapf(err, "wait operation %q", op.ID())
}
lg.Info("Truncate operation done", zap.Stringer("id", op.ID()))
}
lg.Info("Merge operation done", zap.Stringer("id", op.ID()))

return nil
}

0 comments on commit cc247c9

Please sign in to comment.