Skip to content

Commit

Permalink
PEERDB_CLICKHOUSE_MAX_INSERT_THREADS (#2255)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Nov 14, 2024
1 parent 42b0208 commit 79732fd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
12 changes: 10 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewClickHouseConnector(
config *protos.ClickhouseConfig,
) (*ClickHouseConnector, error) {
logger := shared.LoggerFromCtx(ctx)
database, err := Connect(ctx, config)
database, err := Connect(ctx, env, config)
if err != nil {
return nil, fmt.Errorf("failed to open connection to ClickHouse peer: %w", err)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func NewClickHouseConnector(
return connector, nil
}

func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
Expand All @@ -228,6 +228,13 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
tlsSetting.RootCAs = caPool
}

var settings clickhouse.Settings
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
} else if maxInsertThreads != 0 {
settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads}
}

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
Auth: clickhouse.Auth{
Expand All @@ -245,6 +252,7 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
{Name: "peerdb"},
},
},
Settings: settings,
DialTimeout: 3600 * time.Second,
ReadTimeout: 3600 * time.Second,
})
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s ClickHouseSuite) Teardown() {
}

func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) {
ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func SetupSuite(t *testing.T) ClickHouseSuite {
s3Helper: s3Helper,
}

ch, err := connclickhouse.Connect(context.Background(), s.PeerForDatabase("default").GetClickhouseConfig())
ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig())
require.NoError(t, err, "failed to connect to clickhouse")
err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix)
require.NoError(t, err, "failed to create clickhouse database")
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) {
})
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
// now test weird names with rename based resync
ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
require.NoError(s.t, err)
require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("DROP TABLE `%s`", dstTableName)))
require.NoError(s.t, ch.Close())
Expand All @@ -523,7 +523,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) {
})
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
// now test weird names with exchange based resync
ch, err = connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig())
ch, err = connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig())
require.NoError(s.t, err)
require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE `%s`", dstTableName)))
require.NoError(s.t, ch.Close())
Expand Down
12 changes: 12 additions & 0 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS",
Description: "Configures max_insert_threads setting on clickhouse for inserting into destination table. Setting left unset when 0",
DefaultValue: "0",
ValueType: protos.DynconfValueType_UINT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES",
Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely",
Expand Down Expand Up @@ -362,6 +370,10 @@ func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]str
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE")
}

func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS")
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM")
}
Expand Down

0 comments on commit 79732fd

Please sign in to comment.