Skip to content

Commit

Permalink
utilize type embedding instead (segmentio#1306)
Browse files Browse the repository at this point in the history
  • Loading branch information
alpergencdev committed Jul 13, 2024
1 parent 8142873 commit 2b805cb
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 194 deletions.
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (batch *Batch) close() (err error) {
conn.mutex.Unlock()

if err != nil {
var kafkaError Error
var kafkaError KafkaError
if !errors.As(err, &kafkaError) && !errors.Is(err, io.ErrShortBuffer) {
conn.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func clientCreateTopic(client *Client, topic string, partitions int) error {

// Topic creation seems to be asynchronous. Metadata for the topic partition
// layout in the cluster is available in the controller before being synced
// with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
// with the other brokers, which causes "KafkaError:[3] Unknown Topic Or Partition"
// when sending requests to the partition leaders.
//
// This loop will wait up to 2 seconds polling the cluster until no errors
Expand Down
2 changes: 1 addition & 1 deletion compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func newLocalClientAndTopic() (*kafka.Client, string, func()) {

// Topic creation seems to be asynchronous. Metadata for the topic partition
// layout in the cluster is available in the controller before being synced
// with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
// with the other brokers, which causes "KafkaError:[3] Unknown Topic Or Partition"
// when sending requests to the partition leaders.
for i := 0; i < 20; i++ {
r, err := client.Fetch(context.Background(), &kafka.FetchRequest{
Expand Down
38 changes: 19 additions & 19 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
return findCoordinatorResponseV0{}, err
}
if response.ErrorCode != 0 {
return findCoordinatorResponseV0{}, Error(response.ErrorCode)
return findCoordinatorResponseV0{}, KafkaError(response.ErrorCode)
}

return response, nil
Expand All @@ -359,7 +359,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
return heartbeatResponseV0{}, err
}
if response.ErrorCode != 0 {
return heartbeatResponseV0{}, Error(response.ErrorCode)
return heartbeatResponseV0{}, KafkaError(response.ErrorCode)
}

return response, nil
Expand All @@ -385,7 +385,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
return joinGroupResponseV1{}, err
}
if response.ErrorCode != 0 {
return joinGroupResponseV1{}, Error(response.ErrorCode)
return joinGroupResponseV1{}, KafkaError(response.ErrorCode)
}

return response, nil
Expand All @@ -411,7 +411,7 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, er
return leaveGroupResponseV0{}, err
}
if response.ErrorCode != 0 {
return leaveGroupResponseV0{}, Error(response.ErrorCode)
return leaveGroupResponseV0{}, KafkaError(response.ErrorCode)
}

return response, nil
Expand All @@ -437,7 +437,7 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er
return listGroupsResponseV1{}, err
}
if response.ErrorCode != 0 {
return listGroupsResponseV1{}, Error(response.ErrorCode)
return listGroupsResponseV1{}, KafkaError(response.ErrorCode)
}

return response, nil
Expand Down Expand Up @@ -465,7 +465,7 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponse
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetCommitResponseV2{}, Error(pr.ErrorCode)
return offsetCommitResponseV2{}, KafkaError(pr.ErrorCode)
}
}
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1,
for _, r := range response.Responses {
for _, pr := range r.PartitionResponses {
if pr.ErrorCode != 0 {
return offsetFetchResponseV1{}, Error(pr.ErrorCode)
return offsetFetchResponseV1{}, KafkaError(pr.ErrorCode)
}
}
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error
return syncGroupResponseV0{}, err
}
if response.ErrorCode != 0 {
return syncGroupResponseV0{}, Error(response.ErrorCode)
return syncGroupResponseV0{}, KafkaError(response.ErrorCode)
}

return response, nil
Expand All @@ -549,7 +549,7 @@ func (c *Conn) RemoteAddr() net.Addr {
// It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations fail with a timeout
// (see type Error) instead of blocking. The deadline applies to all future and
// (see type KafkaError) instead of blocking. The deadline applies to all future and
// pending I/O, not just the immediately following call to Read or Write. After
// a deadline has been exceeded, the connection may be closed if it was found to
// be in an unrecoverable state.
Expand Down Expand Up @@ -941,7 +941,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
return size, err
}
if p.ErrorCode != 0 {
return size, Error(p.ErrorCode)
return size, KafkaError(p.ErrorCode)
}
offset = p.Offset
return size, nil
Expand Down Expand Up @@ -1030,7 +1030,7 @@ func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []top
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return nil, Error(t.TopicErrorCode)
return nil, KafkaError(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Expand All @@ -1052,7 +1052,7 @@ func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []top
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return nil, Error(t.TopicErrorCode)
return nil, KafkaError(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Expand Down Expand Up @@ -1226,7 +1226,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
var p produceResponsePartitionV7
size, err := p.readFrom(r, size)
if err == nil && p.ErrorCode != 0 {
err = Error(p.ErrorCode)
err = KafkaError(p.ErrorCode)
}
if err == nil {
partition = p.Partition
Expand All @@ -1238,7 +1238,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
var p produceResponsePartitionV2
size, err := p.readFrom(r, size)
if err == nil && p.ErrorCode != 0 {
err = Error(p.ErrorCode)
err = KafkaError(p.ErrorCode)
}
if err == nil {
partition = p.Partition
Expand Down Expand Up @@ -1290,7 +1290,7 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID
func (c *Conn) readResponse(size int, res interface{}) error {
size, err := read(&c.rbuf, size, res)
if err != nil {
var kafkaError Error
var kafkaError KafkaError
if errors.As(err, &kafkaError) {
size, err = discardN(&c.rbuf, size, size)
}
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
}

if err = read(deadline, size); err != nil {
var kafkaError Error
var kafkaError KafkaError
if !errors.As(err, &kafkaError) {
c.conn.Close()
}
Expand Down Expand Up @@ -1488,7 +1488,7 @@ func (c *Conn) ApiVersions() ([]ApiVersion, error) {
}

if errorCode != 0 {
return r, Error(errorCode)
return r, KafkaError(errorCode)
}

return r, nil
Expand Down Expand Up @@ -1588,7 +1588,7 @@ func (c *Conn) saslHandshake(mechanism string) error {
},
)
if err == nil && resp.ErrorCode != 0 {
err = Error(resp.ErrorCode)
err = KafkaError(resp.ErrorCode)
}
return err
}
Expand Down Expand Up @@ -1620,7 +1620,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
},
)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
err = KafkaError(response.ErrorCode)
}
return response.Data, err
}
Expand Down
32 changes: 16 additions & 16 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (g *Generation) close() {
//
// The provided function MUST support cancellation via the ctx argument and exit
// in a timely manner once the ctx is complete. When the context is closed, the
// context's Error() function will return ErrGenerationEnded.
// context's KafkaError() function will return ErrGenerationEnded.
//
// When closing out a generation, the consumer group will wait for all functions
// launched by Start to exit before the group can move on and join the next
Expand Down Expand Up @@ -536,7 +536,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
g.logError(func(l Logger) {
l.Printf("Problem getting partitions while checking for changes, %v", err)
})
var kafkaError Error
var kafkaError KafkaError
if errors.As(err, &kafkaError) {
continue
}
Expand Down Expand Up @@ -910,7 +910,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
CoordinatorKey: cg.config.ID,
})
if err == nil && out.ErrorCode != 0 {
err = Error(out.ErrorCode)
err = KafkaError(out.ErrorCode)
}
if err != nil {
return nil, err
Expand All @@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// - GroupLoadInProgress:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - InconsistentGroupProtocol:
// - InvalidSessionTimeout:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
Expand All @@ -939,7 +939,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i

response, err := conn.joinGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
err = KafkaError(response.ErrorCode)
}
if err != nil {
return "", 0, nil, err
Expand Down Expand Up @@ -1073,16 +1073,16 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - IllegalGeneration:
// - RebalanceInProgress:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
err = KafkaError(response.ErrorCode)
}
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion createtopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse
continue
}
if tr.ErrorCode != 0 {
return response, Error(tr.ErrorCode)
return response, KafkaError(tr.ErrorCode)
}
}

Expand Down
4 changes: 2 additions & 2 deletions deletetopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D
if t.ErrorCode == 0 {
ret.Errors[t.Name] = nil
} else {
ret.Errors[t.Name] = Error(t.ErrorCode)
ret.Errors[t.Name] = KafkaError(t.ErrorCode)
}
}

Expand Down Expand Up @@ -168,7 +168,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse
}
for _, c := range response.TopicErrorCodes {
if c.ErrorCode != 0 {
return response, Error(c.ErrorCode)
return response, KafkaError(c.ErrorCode)
}
}
return response, nil
Expand Down
Loading

0 comments on commit 2b805cb

Please sign in to comment.