-
Notifications
You must be signed in to change notification settings - Fork 792
/
alterconfigs.go
107 lines (87 loc) · 2.75 KB
/
alterconfigs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/alterconfigs"
)
// AlterConfigsRequest represents a request sent to a kafka broker to alter configs.
type AlterConfigsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of resources to update.
Resources []AlterConfigRequestResource
// When set to true, topics are not created but the configuration is
// validated as if they were.
ValidateOnly bool
}
type AlterConfigRequestResource struct {
// Resource Type
ResourceType ResourceType
// Resource Name
ResourceName string
// Configs is a list of configuration updates.
Configs []AlterConfigRequestConfig
}
type AlterConfigRequestConfig struct {
// Configuration key name
Name string
// The value to set for the configuration key.
Value string
}
// AlterConfigsResponse represents a response from a kafka broker to an alter config request.
type AlterConfigsResponse struct {
// Duration for which the request was throttled due to a quota violation.
Throttle time.Duration
// Mapping of topic names to errors that occurred while attempting to create
// the topics.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Errors map[AlterConfigsResponseResource]error
}
// AlterConfigsResponseResource helps map errors to specific resources in an
// alter config response.
type AlterConfigsResponseResource struct {
Type int8
Name string
}
// AlterConfigs sends a config altering request to a kafka broker and returns the
// response.
func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error) {
resources := make([]alterconfigs.RequestResources, len(req.Resources))
for i, t := range req.Resources {
configs := make([]alterconfigs.RequestConfig, len(t.Configs))
for j, v := range t.Configs {
configs[j] = alterconfigs.RequestConfig{
Name: v.Name,
Value: v.Value,
}
}
resources[i] = alterconfigs.RequestResources{
ResourceType: int8(t.ResourceType),
ResourceName: t.ResourceName,
Configs: configs,
}
}
m, err := c.roundTrip(ctx, req.Addr, &alterconfigs.Request{
Resources: resources,
ValidateOnly: req.ValidateOnly,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
}
res := m.(*alterconfigs.Response)
ret := &AlterConfigsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Errors: make(map[AlterConfigsResponseResource]error, len(res.Responses)),
}
for _, t := range res.Responses {
ret.Errors[AlterConfigsResponseResource{
Type: t.ResourceType,
Name: t.ResourceName,
}] = makeError(t.ErrorCode, t.ErrorMessage)
}
return ret, nil
}