Skip to content

Commit

Permalink
protocol(ticdc): remove useless code and setup DefaultMaxMessageBytes (
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Dec 26, 2021
1 parent 0c262b7 commit 87f287e
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 335 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type Config struct {
func NewConfig() *Config {
return &Config{
Version: "2.4.0",
// MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker.
MaxMessageBytes: 1 * 1024 * 1024,
// MaxMessageBytes will be used to initialize producer.
MaxMessageBytes: config.DefaultMaxMessageBytes,
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
Expand Down
7 changes: 7 additions & 0 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {
func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.Background()
// Test default values.
defaultConfig := NewConfig()
c.Assert(defaultConfig.MaxMessageBytes, check.Equals, 10*1024*1024)
c.Assert(defaultConfig.ReplicationFactor, check.Equals, int16(1))
c.Assert(defaultConfig.Compression, check.Equals, "none")
c.Assert(defaultConfig.AutoCreate, check.Equals, true)
// Exception testing
config := NewConfig()
config.Version = "invalid"
_, err := newSaramaConfigImpl(ctx, config)
Expand Down
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,6 @@ error = '''
locate region by id
'''

["CDC:ErrMQSinkUnknownProtocol"]
error = '''
unknown '%s' protocol for Message Queue sink
'''

["CDC:ErrMarshalFailed"]
error = '''
marshal failed
Expand Down
85 changes: 0 additions & 85 deletions pkg/config/mq_sink_protocol.go

This file was deleted.

113 changes: 0 additions & 113 deletions pkg/config/mq_sink_protocol_test.go

This file was deleted.

7 changes: 0 additions & 7 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ package config
// DefaultMaxMessageBytes sets the default value for max-message-bytes
const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var ForceEnableOldValueProtocols = []string{
ProtocolCanal.String(),
ProtocolCanalJSON.String(),
ProtocolMaxwell.String(),
}

// SinkConfig represents sink config for a changefeed
type SinkConfig struct {
DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"`
Expand Down
1 change: 0 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ var (
ErrAsyncBroadcastNotSupport = errors.Normalize("Async broadcasts not supported", errors.RFCCodeText("CDC:ErrAsyncBroadcastNotSupport"))
ErrKafkaInvalidConfig = errors.Normalize("kafka config invalid", errors.RFCCodeText("CDC:ErrKafkaInvalidConfig"))
ErrSinkURIInvalid = errors.Normalize("sink uri invalid", errors.RFCCodeText("CDC:ErrSinkURIInvalid"))
ErrMQSinkUnknownProtocol = errors.Normalize("unknown '%s' protocol for Message Queue sink", errors.RFCCodeText("CDC:ErrMQSinkUnknownProtocol"))
ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"))
ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"))
ErrMySQLConnectionError = errors.Normalize("MySQL connection error", errors.RFCCodeText("CDC:ErrMySQLConnectionError"))
Expand Down
97 changes: 0 additions & 97 deletions pkg/kafka/cluster_admin_client_mock_impl.go

This file was deleted.

25 changes: 0 additions & 25 deletions pkg/kafka/config.go

This file was deleted.

0 comments on commit 87f287e

Please sign in to comment.