Skip to content

Commit

Permalink
Merge pull request #29 from GreenmaskIO/validate_schema_diff
Browse files Browse the repository at this point in the history
Validate schema diff feature
  • Loading branch information
wwoytenko authored Mar 15, 2024
2 parents 7eb2009 + 6d04e03 commit 6af4717
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 46 deletions.
23 changes: 21 additions & 2 deletions cmd/greenmask/cmd/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package validate

import (
"context"
"os"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand All @@ -24,6 +25,7 @@ import (
cmdInternals "github.com/greenmaskio/greenmask/internal/db/postgres/cmd"
"github.com/greenmaskio/greenmask/internal/db/postgres/transformers/utils"
"github.com/greenmaskio/greenmask/internal/domains"
"github.com/greenmaskio/greenmask/internal/storages/builder"
"github.com/greenmaskio/greenmask/internal/utils/logger"
)

Expand Down Expand Up @@ -66,15 +68,23 @@ func run(cmd *cobra.Command, args []string) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
st, err := builder.GetStorage(ctx, &Config.Storage, &Config.Log)
if err != nil {
log.Fatal().Err(err).Msg("fatal")
}

validate, err := cmdInternals.NewValidate(Config, utils.DefaultTransformerRegistry)
validate, err := cmdInternals.NewValidate(Config, utils.DefaultTransformerRegistry, st)
if err != nil {
log.Fatal().Err(err).Msg("")
}

if err := validate.Run(ctx); err != nil {
exitCode, err := validate.Run(ctx)
if err != nil {
log.Fatal().Err(err).Msg("")
}
if exitCode != 0 {
os.Exit(exitCode)
}
}

func init() {
Expand Down Expand Up @@ -150,4 +160,13 @@ func init() {
log.Fatal().Err(err).Msg("fatal")
}

schemaFlagName := "schema"
Cmd.Flags().Bool(
schemaFlagName, false, "Make a schema diff between previous dump and the current state",
)
flag = Cmd.Flags().Lookup(schemaFlagName)
if err := viper.BindPFlag("validate.schema", flag); err != nil {
log.Fatal().Err(err).Msg("fatal")
}

}
1 change: 1 addition & 0 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func (d *Dump) mergeAndWriteToc(ctx context.Context, tx pgx.Tx) error {
func (d *Dump) writeMetaData(ctx context.Context, startedAt, completedAt time.Time) error {
metadata, err := storageDto.NewMetadata(
d.resultToc, d.tocFileSize, startedAt, completedAt, d.config.Dump.Transformation, d.dumpedObjectSizes,
d.context.DatabaseSchema,
)
if err != nil {
return fmt.Errorf("unable build metadata: %w", err)
Expand Down
179 changes: 150 additions & 29 deletions internal/db/postgres/cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"slices"
"strconv"
"strings"
Expand All @@ -20,12 +20,12 @@ import (
runtimeContext "github.com/greenmaskio/greenmask/internal/db/postgres/context"
"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/db/postgres/pgcopy"
storageDto "github.com/greenmaskio/greenmask/internal/db/postgres/storage"
"github.com/greenmaskio/greenmask/internal/db/postgres/toc"
"github.com/greenmaskio/greenmask/internal/db/postgres/transformers/custom"
"github.com/greenmaskio/greenmask/internal/db/postgres/transformers/utils"
"github.com/greenmaskio/greenmask/internal/domains"
"github.com/greenmaskio/greenmask/internal/storages"
"github.com/greenmaskio/greenmask/internal/storages/directory"
"github.com/greenmaskio/greenmask/internal/utils/reader"
"github.com/greenmaskio/greenmask/pkg/toolkit"
)
Expand All @@ -40,51 +40,59 @@ const (
HorizontalTableFormat = "horizontal"
)

const (
nonZeroExitCode = 1
zeroExitCode = 0
)

type closeFunc func()

type Validate struct {
*Dump
tmpDir string
tmpDir string
mainSt storages.Storager
exitCode int
}

func NewValidate(cfg *domains.Config, registry *utils.TransformerRegistry) (*Validate, error) {
var st storages.Storager
st, err := directory.NewStorage(&directory.Config{Path: cfg.Common.TempDirectory})
if err != nil {
return nil, fmt.Errorf("error initializing storage")
}
tmpDir := strconv.FormatInt(time.Now().UnixMilli(), 10)
st = st.SubStorage(tmpDir, true)
func NewValidate(cfg *domains.Config, registry *utils.TransformerRegistry, st storages.Storager) (*Validate, error) {
mainSt := st
tmpDirName := strconv.FormatInt(time.Now().UnixMilli(), 10)
st = st.SubStorage(tmpDirName, true)

d := NewDump(cfg, st, registry)
d.dumpIdSequence = toc.NewDumpSequence(0)

d.validate = true
return &Validate{
Dump: d,
tmpDir: path.Join(cfg.Common.TempDirectory, tmpDir),
Dump: d,
tmpDir: tmpDirName,
exitCode: zeroExitCode,
mainSt: mainSt,
}, nil
}

func (v *Validate) Run(ctx context.Context) error {
func (v *Validate) Run(ctx context.Context) (int, error) {

defer func() {
// Deleting temp dir after closing it
if err := os.RemoveAll(v.tmpDir); err != nil {
log.Warn().Err(err).Msgf("unable to delete temp directory")
if !v.config.Validate.Diff {
return
}
if err := v.mainSt.Delete(ctx, v.tmpDir); err != nil {
log.Warn().Err(err).Msg("error deleting temporary directory")
}
}()
if err := custom.BootstrapCustomTransformers(ctx, v.registry, v.config.CustomTransformers); err != nil {
return fmt.Errorf("error bootstraping custom transformers: %w", err)
return nonZeroExitCode, fmt.Errorf("error bootstraping custom transformers: %w", err)
}

dsn, err := v.pgDumpOptions.GetPgDSN()
if err != nil {
return fmt.Errorf("cannot build connection string: %w", err)
return nonZeroExitCode, fmt.Errorf("cannot build connection string: %w", err)
}

conn, err := v.connect(ctx, dsn)
if err != nil {
return err
return nonZeroExitCode, err
}
defer func() {
if err := conn.Close(ctx); err != nil {
Expand All @@ -94,7 +102,7 @@ func (v *Validate) Run(ctx context.Context) error {

tx, err := v.startMainTx(ctx, conn)
if err != nil {
return fmt.Errorf("cannot prepare backup transaction: %w", err)
return nonZeroExitCode, fmt.Errorf("cannot prepare backup transaction: %w", err)
}
defer func() {
if err := tx.Rollback(ctx); err != nil {
Expand All @@ -103,39 +111,43 @@ func (v *Validate) Run(ctx context.Context) error {
}()

if err = v.gatherPgFacts(ctx, tx); err != nil {
return fmt.Errorf("error gathering facts: %w", err)
return nonZeroExitCode, fmt.Errorf("error gathering facts: %w", err)
}

// Get list of tables to validate
tablesToValidate, err := v.getTablesToValidate()
if err != nil {
return err
return nonZeroExitCode, err
}
v.config.Dump.Transformation = tablesToValidate

v.context, err = runtimeContext.NewRuntimeContext(ctx, tx, v.config.Dump.Transformation, v.registry,
v.pgDumpOptions, v.version)
if err != nil {
return fmt.Errorf("unable to build runtime context: %w", err)
return nonZeroExitCode, fmt.Errorf("unable to build runtime context: %w", err)
}

if err = v.printValidationWarnings(); err != nil {
return err
return nonZeroExitCode, err
}

if err = v.diffWithPreviousSchema(ctx); err != nil {
return nonZeroExitCode, err
}

if !v.config.Validate.Data {
return nil
return v.exitCode, nil
}

if err = v.dumpTables(ctx); err != nil {
return err
return nonZeroExitCode, err
}

if err = v.print(ctx); err != nil {
return err
return nonZeroExitCode, err
}

return nil
return v.exitCode, nil
}

func (v *Validate) print(ctx context.Context) error {
Expand Down Expand Up @@ -328,6 +340,115 @@ func (v *Validate) getTablesToValidate() ([]*domains.Table, error) {
return tablesToValidate, nil
}

func (v *Validate) diffWithPreviousSchema(ctx context.Context) error {
if !v.config.Validate.Schema {
return nil
}

dumpId, err := v.getPreviousDumpId(ctx)
if err != nil {
return fmt.Errorf("cannot get previous dump id: %w", err)
}
if dumpId == "" {
return nil
}

md, err := v.getPreviousMetadata(ctx, dumpId)
if err != nil {
return fmt.Errorf("cannot get previous metadata: %w", err)
}

diff := md.DatabaseSchema.Diff(v.context.DatabaseSchema)
if len(diff) > 0 {
v.exitCode = nonZeroExitCode

err = v.printSchemaDiff(diff, dumpId)
if err != nil {
return fmt.Errorf("cannot print schema diff: %w", err)
}
}

return nil
}

func (v *Validate) printSchemaDiff(diff []*toolkit.DiffNode, previousDumpId string) error {

if v.config.Validate.Format == JsonFormat {
data, err := json.Marshal(diff)
if err != nil {
return fmt.Errorf("cannot encode diff node: %w", err)
}
log.Warn().
Str("PreviousDumpId", previousDumpId).
RawJSON("Diff", data).
Str("Hint", "Check schema changes before making new dump").
Msg("Database schema has been changed")
return nil
}
log.Warn().
Str("PreviousDumpId", previousDumpId).
Str("Hint", "Check schema changes before making new dump").
Msg("Database schema has been changed")

for _, node := range diff {
log.Warn().
Str("Event", node.Event).
Any("Signature", node.Signature).
Msg(toolkit.DiffEventMsgs[node.Event])
}

return nil
}

func (v *Validate) getPreviousDumpId(ctx context.Context) (string, error) {
var backupNames []string

_, dirs, err := v.mainSt.ListDir(ctx)
if err != nil {
return "", fmt.Errorf("cannot walk through directory: %w", err)
}
for _, dir := range dirs {
exists, err := dir.Exists(ctx, "metadata.json")
if err != nil {
return "", fmt.Errorf("cannot check file existence: %w", err)
}
if exists {
backupNames = append(backupNames, dir.Dirname())
}
}

slices.SortFunc(
backupNames, func(a, b string) int {
if a > b {
return -1
}
return 1
},
)
if len(backupNames) > 0 {
return backupNames[0], nil
}
return "", nil
}

func (v *Validate) getPreviousMetadata(ctx context.Context, dumpId string) (*storageDto.Metadata, error) {

st := v.mainSt.SubStorage(dumpId, true)

f, err := st.GetObject(ctx, MetadataJsonFileName)
if err != nil {
return nil, fmt.Errorf("cannot open metadata file: %w", err)
}
defer f.Close()

previousMetadata := &storageDto.Metadata{}

if err = json.NewDecoder(f).Decode(&previousMetadata); err != nil {
return nil, fmt.Errorf("cannot decode metadata file: %w", err)
}
return previousMetadata, nil
}

func findTableBySchemaAndName(Transformations []*domains.Table, schemaName, tableName string) (*domains.Table, error) {
var foundTable *domains.Table
for _, t := range Transformations {
Expand Down
8 changes: 8 additions & 0 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type RuntimeContext struct {
Registry *transformersUtils.TransformerRegistry
// TypeMap - map of registered types including custom types. It's common for the whole runtime
TypeMap *pgtype.Map
// DatabaseSchema - list of tables with columns - required for schema diff checking
DatabaseSchema toolkit.DatabaseSchema
}

// NewRuntimeContext - creating new runtime context.
Expand Down Expand Up @@ -71,12 +73,18 @@ func NewRuntimeContext(
return nil, fmt.Errorf("cannot build dump object list: %w", err)
}

schema, err := getDatabaseSchema(ctx, tx, opt)
if err != nil {
return nil, fmt.Errorf("cannot get database schema: %w", err)
}

return &RuntimeContext{
Tables: tables,
Types: types,
DataSectionObjects: dataSectionObjects,
Warnings: warnings,
Registry: r,
DatabaseSchema: schema,
}, nil
}

Expand Down
Loading

0 comments on commit 6af4717

Please sign in to comment.