From 2b805cbe34efc26b1dbb17a8e2e3a762fa34c6a1 Mon Sep 17 00:00:00 2001 From: alpergencdev Date: Sat, 13 Jul 2024 19:38:55 +0300 Subject: [PATCH] utilize type embedding instead (#1306) --- batch.go | 2 +- client_test.go | 2 +- compress/compress_test.go | 2 +- conn.go | 38 +++--- consumergroup.go | 32 ++--- createtopics.go | 2 +- deletetopics.go | 4 +- error.go | 261 +++++++++++++++++-------------------- error_test.go | 4 +- heartbeat.go | 2 +- listoffset.go | 2 +- read.go | 8 +- reader.go | 2 +- topics/list_topics_test.go | 2 +- transport.go | 4 +- writer.go | 2 +- 16 files changed, 175 insertions(+), 194 deletions(-) diff --git a/batch.go b/batch.go index 19dcef8cd..f9bc9f224 100644 --- a/batch.go +++ b/batch.go @@ -100,7 +100,7 @@ func (batch *Batch) close() (err error) { conn.mutex.Unlock() if err != nil { - var kafkaError Error + var kafkaError KafkaError if !errors.As(err, &kafkaError) && !errors.Is(err, io.ErrShortBuffer) { conn.Close() } diff --git a/client_test.go b/client_test.go index 62153b234..c0895ca8c 100644 --- a/client_test.go +++ b/client_test.go @@ -48,7 +48,7 @@ func clientCreateTopic(client *Client, topic string, partitions int) error { // Topic creation seems to be asynchronous. Metadata for the topic partition // layout in the cluster is available in the controller before being synced - // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" + // with the other brokers, which causes "KafkaError:[3] Unknown Topic Or Partition" // when sending requests to the partition leaders. // // This loop will wait up to 2 seconds polling the cluster until no errors diff --git a/compress/compress_test.go b/compress/compress_test.go index 1da841227..de7ab673b 100644 --- a/compress/compress_test.go +++ b/compress/compress_test.go @@ -467,7 +467,7 @@ func newLocalClientAndTopic() (*kafka.Client, string, func()) { // Topic creation seems to be asynchronous. Metadata for the topic partition // layout in the cluster is available in the controller before being synced - // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" + // with the other brokers, which causes "KafkaError:[3] Unknown Topic Or Partition" // when sending requests to the partition leaders. for i := 0; i < 20; i++ { r, err := client.Fetch(context.Background(), &kafka.FetchRequest{ diff --git a/conn.go b/conn.go index 2b51afbd5..8fab33bf4 100644 --- a/conn.go +++ b/conn.go @@ -333,7 +333,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato return findCoordinatorResponseV0{}, err } if response.ErrorCode != 0 { - return findCoordinatorResponseV0{}, Error(response.ErrorCode) + return findCoordinatorResponseV0{}, KafkaError(response.ErrorCode) } return response, nil @@ -359,7 +359,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error return heartbeatResponseV0{}, err } if response.ErrorCode != 0 { - return heartbeatResponseV0{}, Error(response.ErrorCode) + return heartbeatResponseV0{}, KafkaError(response.ErrorCode) } return response, nil @@ -385,7 +385,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error return joinGroupResponseV1{}, err } if response.ErrorCode != 0 { - return joinGroupResponseV1{}, Error(response.ErrorCode) + return joinGroupResponseV1{}, KafkaError(response.ErrorCode) } return response, nil @@ -411,7 +411,7 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, er return leaveGroupResponseV0{}, err } if response.ErrorCode != 0 { - return leaveGroupResponseV0{}, Error(response.ErrorCode) + return leaveGroupResponseV0{}, KafkaError(response.ErrorCode) } return response, nil @@ -437,7 +437,7 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er return listGroupsResponseV1{}, err } if response.ErrorCode != 0 { - return listGroupsResponseV1{}, Error(response.ErrorCode) + return listGroupsResponseV1{}, KafkaError(response.ErrorCode) } return response, nil @@ -465,7 +465,7 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponse for _, r := range response.Responses { for _, pr := range r.PartitionResponses { if pr.ErrorCode != 0 { - return offsetCommitResponseV2{}, Error(pr.ErrorCode) + return offsetCommitResponseV2{}, KafkaError(pr.ErrorCode) } } } @@ -496,7 +496,7 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, for _, r := range response.Responses { for _, pr := range r.PartitionResponses { if pr.ErrorCode != 0 { - return offsetFetchResponseV1{}, Error(pr.ErrorCode) + return offsetFetchResponseV1{}, KafkaError(pr.ErrorCode) } } } @@ -524,7 +524,7 @@ func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error return syncGroupResponseV0{}, err } if response.ErrorCode != 0 { - return syncGroupResponseV0{}, Error(response.ErrorCode) + return syncGroupResponseV0{}, KafkaError(response.ErrorCode) } return response, nil @@ -549,7 +549,7 @@ func (c *Conn) RemoteAddr() net.Addr { // It is equivalent to calling both SetReadDeadline and SetWriteDeadline. // // A deadline is an absolute time after which I/O operations fail with a timeout -// (see type Error) instead of blocking. The deadline applies to all future and +// (see type KafkaError) instead of blocking. The deadline applies to all future and // pending I/O, not just the immediately following call to Read or Write. After // a deadline has been exceeded, the connection may be closed if it was found to // be in an unrecoverable state. @@ -941,7 +941,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) { return size, err } if p.ErrorCode != 0 { - return size, Error(p.ErrorCode) + return size, KafkaError(p.ErrorCode) } offset = p.Offset return size, nil @@ -1030,7 +1030,7 @@ func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []top // We only report errors if they happened for the topic of // the connection, otherwise the topic will simply have no // partitions in the result set. - return nil, Error(t.TopicErrorCode) + return nil, KafkaError(t.TopicErrorCode) } for _, p := range t.Partitions { partitions = append(partitions, Partition{ @@ -1052,7 +1052,7 @@ func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []top // We only report errors if they happened for the topic of // the connection, otherwise the topic will simply have no // partitions in the result set. - return nil, Error(t.TopicErrorCode) + return nil, KafkaError(t.TopicErrorCode) } for _, p := range t.Partitions { partitions = append(partitions, Partition{ @@ -1226,7 +1226,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) var p produceResponsePartitionV7 size, err := p.readFrom(r, size) if err == nil && p.ErrorCode != 0 { - err = Error(p.ErrorCode) + err = KafkaError(p.ErrorCode) } if err == nil { partition = p.Partition @@ -1238,7 +1238,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) var p produceResponsePartitionV2 size, err := p.readFrom(r, size) if err == nil && p.ErrorCode != 0 { - err = Error(p.ErrorCode) + err = KafkaError(p.ErrorCode) } if err == nil { partition = p.Partition @@ -1290,7 +1290,7 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID func (c *Conn) readResponse(size int, res interface{}) error { size, err := read(&c.rbuf, size, res) if err != nil { - var kafkaError Error + var kafkaError KafkaError if errors.As(err, &kafkaError) { size, err = discardN(&c.rbuf, size, size) } @@ -1351,7 +1351,7 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func } if err = read(deadline, size); err != nil { - var kafkaError Error + var kafkaError KafkaError if !errors.As(err, &kafkaError) { c.conn.Close() } @@ -1488,7 +1488,7 @@ func (c *Conn) ApiVersions() ([]ApiVersion, error) { } if errorCode != 0 { - return r, Error(errorCode) + return r, KafkaError(errorCode) } return r, nil @@ -1588,7 +1588,7 @@ func (c *Conn) saslHandshake(mechanism string) error { }, ) if err == nil && resp.ErrorCode != 0 { - err = Error(resp.ErrorCode) + err = KafkaError(resp.ErrorCode) } return err } @@ -1620,7 +1620,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { }, ) if err == nil && response.ErrorCode != 0 { - err = Error(response.ErrorCode) + err = KafkaError(response.ErrorCode) } return response.Data, err } diff --git a/consumergroup.go b/consumergroup.go index f4bb382cb..f75c6dbdb 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -365,7 +365,7 @@ func (g *Generation) close() { // // The provided function MUST support cancellation via the ctx argument and exit // in a timely manner once the ctx is complete. When the context is closed, the -// context's Error() function will return ErrGenerationEnded. +// context's KafkaError() function will return ErrGenerationEnded. // // When closing out a generation, the consumer group will wait for all functions // launched by Start to exit before the group can move on and join the next @@ -536,7 +536,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { g.logError(func(l Logger) { l.Printf("Problem getting partitions while checking for changes, %v", err) }) - var kafkaError Error + var kafkaError KafkaError if errors.As(err, &kafkaError) { continue } @@ -910,7 +910,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { CoordinatorKey: cg.config.ID, }) if err == nil && out.ErrorCode != 0 { - err = Error(out.ErrorCode) + err = KafkaError(out.ErrorCode) } if err != nil { return nil, err @@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // the leader. Otherwise, GroupMemberAssignments will be nil. // // Possible kafka error codes returned: -// * GroupLoadInProgress: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * InconsistentGroupProtocol: -// * InvalidSessionTimeout: -// * GroupAuthorizationFailed: +// - GroupLoadInProgress: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - InconsistentGroupProtocol: +// - InvalidSessionTimeout: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { @@ -939,7 +939,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i response, err := conn.joinGroup(request) if err == nil && response.ErrorCode != 0 { - err = Error(response.ErrorCode) + err = KafkaError(response.ErrorCode) } if err != nil { return "", 0, nil, err @@ -1073,16 +1073,16 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember // Readers subscriptions topic => partitions // // Possible kafka error codes returned: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * IllegalGeneration: -// * RebalanceInProgress: -// * GroupAuthorizationFailed: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - IllegalGeneration: +// - RebalanceInProgress: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) response, err := conn.syncGroup(request) if err == nil && response.ErrorCode != 0 { - err = Error(response.ErrorCode) + err = KafkaError(response.ErrorCode) } if err != nil { return nil, err diff --git a/createtopics.go b/createtopics.go index 8ad9ebf44..a3b4ed2e0 100644 --- a/createtopics.go +++ b/createtopics.go @@ -365,7 +365,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse continue } if tr.ErrorCode != 0 { - return response, Error(tr.ErrorCode) + return response, KafkaError(tr.ErrorCode) } } diff --git a/deletetopics.go b/deletetopics.go index d758d9fd6..2ea08bf1a 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -59,7 +59,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D if t.ErrorCode == 0 { ret.Errors[t.Name] = nil } else { - ret.Errors[t.Name] = Error(t.ErrorCode) + ret.Errors[t.Name] = KafkaError(t.ErrorCode) } } @@ -168,7 +168,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { - return response, Error(c.ErrorCode) + return response, KafkaError(c.ErrorCode) } } return response, nil diff --git a/error.go b/error.go index 9d3a4cf5f..ebc57c574 100644 --- a/error.go +++ b/error.go @@ -7,127 +7,127 @@ import ( "syscall" ) -// Error represents the different error codes that may be returned by kafka. +// KafkaError represents the different error codes that may be returned by kafka. // https://kafka.apache.org/protocol#protocol_error_codes -type Error int +type KafkaError int const ( - Unknown Error = -1 - OffsetOutOfRange Error = 1 - InvalidMessage Error = 2 - UnknownTopicOrPartition Error = 3 - InvalidMessageSize Error = 4 - LeaderNotAvailable Error = 5 - NotLeaderForPartition Error = 6 - RequestTimedOut Error = 7 - BrokerNotAvailable Error = 8 - ReplicaNotAvailable Error = 9 - MessageSizeTooLarge Error = 10 - StaleControllerEpoch Error = 11 - OffsetMetadataTooLarge Error = 12 - NetworkException Error = 13 - GroupLoadInProgress Error = 14 - GroupCoordinatorNotAvailable Error = 15 - NotCoordinatorForGroup Error = 16 - InvalidTopic Error = 17 - RecordListTooLarge Error = 18 - NotEnoughReplicas Error = 19 - NotEnoughReplicasAfterAppend Error = 20 - InvalidRequiredAcks Error = 21 - IllegalGeneration Error = 22 - InconsistentGroupProtocol Error = 23 - InvalidGroupId Error = 24 - UnknownMemberId Error = 25 - InvalidSessionTimeout Error = 26 - RebalanceInProgress Error = 27 - InvalidCommitOffsetSize Error = 28 - TopicAuthorizationFailed Error = 29 - GroupAuthorizationFailed Error = 30 - ClusterAuthorizationFailed Error = 31 - InvalidTimestamp Error = 32 - UnsupportedSASLMechanism Error = 33 - IllegalSASLState Error = 34 - UnsupportedVersion Error = 35 - TopicAlreadyExists Error = 36 - InvalidPartitionNumber Error = 37 - InvalidReplicationFactor Error = 38 - InvalidReplicaAssignment Error = 39 - InvalidConfiguration Error = 40 - NotController Error = 41 - InvalidRequest Error = 42 - UnsupportedForMessageFormat Error = 43 - PolicyViolation Error = 44 - OutOfOrderSequenceNumber Error = 45 - DuplicateSequenceNumber Error = 46 - InvalidProducerEpoch Error = 47 - InvalidTransactionState Error = 48 - InvalidProducerIDMapping Error = 49 - InvalidTransactionTimeout Error = 50 - ConcurrentTransactions Error = 51 - TransactionCoordinatorFenced Error = 52 - TransactionalIDAuthorizationFailed Error = 53 - SecurityDisabled Error = 54 - BrokerAuthorizationFailed Error = 55 - KafkaStorageError Error = 56 - LogDirNotFound Error = 57 - SASLAuthenticationFailed Error = 58 - UnknownProducerId Error = 59 - ReassignmentInProgress Error = 60 - DelegationTokenAuthDisabled Error = 61 - DelegationTokenNotFound Error = 62 - DelegationTokenOwnerMismatch Error = 63 - DelegationTokenRequestNotAllowed Error = 64 - DelegationTokenAuthorizationFailed Error = 65 - DelegationTokenExpired Error = 66 - InvalidPrincipalType Error = 67 - NonEmptyGroup Error = 68 - GroupIdNotFound Error = 69 - FetchSessionIDNotFound Error = 70 - InvalidFetchSessionEpoch Error = 71 - ListenerNotFound Error = 72 - TopicDeletionDisabled Error = 73 - FencedLeaderEpoch Error = 74 - UnknownLeaderEpoch Error = 75 - UnsupportedCompressionType Error = 76 - StaleBrokerEpoch Error = 77 - OffsetNotAvailable Error = 78 - MemberIDRequired Error = 79 - PreferredLeaderNotAvailable Error = 80 - GroupMaxSizeReached Error = 81 - FencedInstanceID Error = 82 - EligibleLeadersNotAvailable Error = 83 - ElectionNotNeeded Error = 84 - NoReassignmentInProgress Error = 85 - GroupSubscribedToTopic Error = 86 - InvalidRecord Error = 87 - UnstableOffsetCommit Error = 88 - ThrottlingQuotaExceeded Error = 89 - ProducerFenced Error = 90 - ResourceNotFound Error = 91 - DuplicateResource Error = 92 - UnacceptableCredential Error = 93 - InconsistentVoterSet Error = 94 - InvalidUpdateVersion Error = 95 - FeatureUpdateFailed Error = 96 - PrincipalDeserializationFailure Error = 97 - SnapshotNotFound Error = 98 - PositionOutOfRange Error = 99 - UnknownTopicID Error = 100 - DuplicateBrokerRegistration Error = 101 - BrokerIDNotRegistered Error = 102 - InconsistentTopicID Error = 103 - InconsistentClusterID Error = 104 - TransactionalIDNotFound Error = 105 - FetchSessionTopicIDError Error = 106 + Unknown KafkaError = -1 + OffsetOutOfRange KafkaError = 1 + InvalidMessage KafkaError = 2 + UnknownTopicOrPartition KafkaError = 3 + InvalidMessageSize KafkaError = 4 + LeaderNotAvailable KafkaError = 5 + NotLeaderForPartition KafkaError = 6 + RequestTimedOut KafkaError = 7 + BrokerNotAvailable KafkaError = 8 + ReplicaNotAvailable KafkaError = 9 + MessageSizeTooLarge KafkaError = 10 + StaleControllerEpoch KafkaError = 11 + OffsetMetadataTooLarge KafkaError = 12 + NetworkException KafkaError = 13 + GroupLoadInProgress KafkaError = 14 + GroupCoordinatorNotAvailable KafkaError = 15 + NotCoordinatorForGroup KafkaError = 16 + InvalidTopic KafkaError = 17 + RecordListTooLarge KafkaError = 18 + NotEnoughReplicas KafkaError = 19 + NotEnoughReplicasAfterAppend KafkaError = 20 + InvalidRequiredAcks KafkaError = 21 + IllegalGeneration KafkaError = 22 + InconsistentGroupProtocol KafkaError = 23 + InvalidGroupId KafkaError = 24 + UnknownMemberId KafkaError = 25 + InvalidSessionTimeout KafkaError = 26 + RebalanceInProgress KafkaError = 27 + InvalidCommitOffsetSize KafkaError = 28 + TopicAuthorizationFailed KafkaError = 29 + GroupAuthorizationFailed KafkaError = 30 + ClusterAuthorizationFailed KafkaError = 31 + InvalidTimestamp KafkaError = 32 + UnsupportedSASLMechanism KafkaError = 33 + IllegalSASLState KafkaError = 34 + UnsupportedVersion KafkaError = 35 + TopicAlreadyExists KafkaError = 36 + InvalidPartitionNumber KafkaError = 37 + InvalidReplicationFactor KafkaError = 38 + InvalidReplicaAssignment KafkaError = 39 + InvalidConfiguration KafkaError = 40 + NotController KafkaError = 41 + InvalidRequest KafkaError = 42 + UnsupportedForMessageFormat KafkaError = 43 + PolicyViolation KafkaError = 44 + OutOfOrderSequenceNumber KafkaError = 45 + DuplicateSequenceNumber KafkaError = 46 + InvalidProducerEpoch KafkaError = 47 + InvalidTransactionState KafkaError = 48 + InvalidProducerIDMapping KafkaError = 49 + InvalidTransactionTimeout KafkaError = 50 + ConcurrentTransactions KafkaError = 51 + TransactionCoordinatorFenced KafkaError = 52 + TransactionalIDAuthorizationFailed KafkaError = 53 + SecurityDisabled KafkaError = 54 + BrokerAuthorizationFailed KafkaError = 55 + KafkaStorageError KafkaError = 56 + LogDirNotFound KafkaError = 57 + SASLAuthenticationFailed KafkaError = 58 + UnknownProducerId KafkaError = 59 + ReassignmentInProgress KafkaError = 60 + DelegationTokenAuthDisabled KafkaError = 61 + DelegationTokenNotFound KafkaError = 62 + DelegationTokenOwnerMismatch KafkaError = 63 + DelegationTokenRequestNotAllowed KafkaError = 64 + DelegationTokenAuthorizationFailed KafkaError = 65 + DelegationTokenExpired KafkaError = 66 + InvalidPrincipalType KafkaError = 67 + NonEmptyGroup KafkaError = 68 + GroupIdNotFound KafkaError = 69 + FetchSessionIDNotFound KafkaError = 70 + InvalidFetchSessionEpoch KafkaError = 71 + ListenerNotFound KafkaError = 72 + TopicDeletionDisabled KafkaError = 73 + FencedLeaderEpoch KafkaError = 74 + UnknownLeaderEpoch KafkaError = 75 + UnsupportedCompressionType KafkaError = 76 + StaleBrokerEpoch KafkaError = 77 + OffsetNotAvailable KafkaError = 78 + MemberIDRequired KafkaError = 79 + PreferredLeaderNotAvailable KafkaError = 80 + GroupMaxSizeReached KafkaError = 81 + FencedInstanceID KafkaError = 82 + EligibleLeadersNotAvailable KafkaError = 83 + ElectionNotNeeded KafkaError = 84 + NoReassignmentInProgress KafkaError = 85 + GroupSubscribedToTopic KafkaError = 86 + InvalidRecord KafkaError = 87 + UnstableOffsetCommit KafkaError = 88 + ThrottlingQuotaExceeded KafkaError = 89 + ProducerFenced KafkaError = 90 + ResourceNotFound KafkaError = 91 + DuplicateResource KafkaError = 92 + UnacceptableCredential KafkaError = 93 + InconsistentVoterSet KafkaError = 94 + InvalidUpdateVersion KafkaError = 95 + FeatureUpdateFailed KafkaError = 96 + PrincipalDeserializationFailure KafkaError = 97 + SnapshotNotFound KafkaError = 98 + PositionOutOfRange KafkaError = 99 + UnknownTopicID KafkaError = 100 + DuplicateBrokerRegistration KafkaError = 101 + BrokerIDNotRegistered KafkaError = 102 + InconsistentTopicID KafkaError = 103 + InconsistentClusterID KafkaError = 104 + TransactionalIDNotFound KafkaError = 105 + FetchSessionTopicIDError KafkaError = 106 ) -// Error satisfies the error interface. -func (e Error) Error() string { +// KafkaError satisfies the error interface. +func (e KafkaError) Error() string { return fmt.Sprintf("[%d] %s: %s", e, e.Title(), e.Description()) } // Timeout returns true if the error was due to a timeout. -func (e Error) Timeout() bool { +func (e KafkaError) Timeout() bool { return e == RequestTimedOut } @@ -135,7 +135,7 @@ func (e Error) Timeout() bool { // if retried at a later time. // Kafka error documentation specifies these as "retriable" // https://kafka.apache.org/protocol#protocol_error_codes -func (e Error) Temporary() bool { +func (e KafkaError) Temporary() bool { switch e { case InvalidMessage, UnknownTopicOrPartition, @@ -173,7 +173,7 @@ func (e Error) Temporary() bool { } // Title returns a human readable title for the error. -func (e Error) Title() string { +func (e KafkaError) Title() string { switch e { case Unknown: return "Unknown" @@ -286,7 +286,7 @@ func (e Error) Title() string { case BrokerAuthorizationFailed: return "Broker Authorization Failed" case KafkaStorageError: - return "Kafka Storage Error" + return "Kafka Storage KafkaError" case LogDirNotFound: return "Log Dir Not Found" case SASLAuthenticationFailed: @@ -376,13 +376,13 @@ func (e Error) Title() string { case TransactionalIDNotFound: return "Transactional ID Not Found" case FetchSessionTopicIDError: - return "Fetch Session Topic ID Error" + return "Fetch Session Topic ID KafkaError" } return "" } // Description returns a human readable description of cause of the error. -func (e Error) Description() string { +func (e KafkaError) Description() string { switch e { case Unknown: return "an unexpected server error occurred" @@ -656,16 +656,13 @@ func (e MessageTooLargeError) Error() string { } type UnknownTopicOrPartitionError struct { + KafkaError Topic string Partition *int } func unknownTopicOrPartition(topic string, partition *int) UnknownTopicOrPartitionError { - return UnknownTopicOrPartitionError{Topic: topic, Partition: partition} -} - -func (e UnknownTopicOrPartitionError) Error() string { - return UnknownTopicOrPartition.Error() + return UnknownTopicOrPartitionError{KafkaError: UnknownTopicOrPartition, Topic: topic, Partition: partition} } func (e UnknownTopicOrPartitionError) Is(err error) bool { @@ -677,30 +674,14 @@ func (e UnknownTopicOrPartitionError) Is(err error) bool { return errors.As(err, &unknownTopicOrPartitionError) } -func (e UnknownTopicOrPartitionError) Title() string { - return UnknownTopicOrPartition.Title() -} - -func (e UnknownTopicOrPartitionError) Description() string { - return UnknownTopicOrPartition.Description() -} - -func (e UnknownTopicOrPartitionError) Timeout() bool { - return UnknownTopicOrPartition.Timeout() -} - -func (e UnknownTopicOrPartitionError) Temporary() bool { - return UnknownTopicOrPartition.Temporary() -} - func makeError(code int16, message string) error { if code == 0 { return nil } if message == "" { - return Error(code) + return KafkaError(code) } - return fmt.Errorf("%w: %s", Error(code), message) + return fmt.Errorf("%w: %s", KafkaError(code), message) } // WriteError is returned by kafka.(*Writer).WriteMessages when the writer is diff --git a/error_test.go b/error_test.go index 3d461154c..36a1ca589 100644 --- a/error_test.go +++ b/error_test.go @@ -6,7 +6,7 @@ import ( ) func TestError(t *testing.T) { - errorCodes := []Error{ + errorCodes := []KafkaError{ Unknown, OffsetOutOfRange, InvalidMessage, @@ -100,7 +100,7 @@ func TestError(t *testing.T) { } t.Run("verify that an invalid error code has an empty title and description", func(t *testing.T) { - err := Error(-2) + err := KafkaError(-2) if s := err.Title(); len(s) != 0 { t.Error("non-empty title:", s) diff --git a/heartbeat.go b/heartbeat.go index a0444dae1..6aca2f2f8 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -70,7 +70,7 @@ func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*Heartbe } if res.ErrorCode != 0 { - ret.Error = Error(res.ErrorCode) + ret.Error = KafkaError(res.ErrorCode) } return ret, nil diff --git a/listoffset.go b/listoffset.go index 11c5d04b4..cd6e9ea8d 100644 --- a/listoffset.go +++ b/listoffset.go @@ -169,7 +169,7 @@ func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*Lis } if p.ErrorCode != 0 { - partition.Error = Error(p.ErrorCode) + partition.Error = KafkaError(p.ErrorCode) } partitionOffsets[key] = partition diff --git a/read.go b/read.go index ec2b38527..f7b115881 100644 --- a/read.go +++ b/read.go @@ -351,7 +351,7 @@ func readFetchResponseHeaderV2(r *bufio.Reader, size int) (throttle int32, water } if p.ErrorCode != 0 { - err = Error(p.ErrorCode) + err = KafkaError(p.ErrorCode) return } @@ -436,7 +436,7 @@ func readFetchResponseHeaderV5(r *bufio.Reader, size int) (throttle int32, water } if p.ErrorCode != 0 { - err = Error(p.ErrorCode) + err = KafkaError(p.ErrorCode) return } @@ -482,7 +482,7 @@ func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, wate return } if errorCode != 0 { - err = Error(errorCode) + err = KafkaError(errorCode) return } @@ -540,7 +540,7 @@ func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, wate } if p.ErrorCode != 0 { - err = Error(p.ErrorCode) + err = KafkaError(p.ErrorCode) return } diff --git a/reader.go b/reader.go index 04d90f355..ca2069435 100644 --- a/reader.go +++ b/reader.go @@ -1421,7 +1421,7 @@ func (r *reader) run(ctx context.Context, offset int64) { break readLoop default: - var kafkaError Error + var kafkaError KafkaError if errors.As(err, &kafkaError) { r.sendError(ctx, err) } else { diff --git a/topics/list_topics_test.go b/topics/list_topics_test.go index 2120c278a..3a1db5737 100644 --- a/topics/list_topics_test.go +++ b/topics/list_topics_test.go @@ -73,7 +73,7 @@ func clientCreateTopic(client *kafka.Client, topic string, partitions int) error // Topic creation seems to be asynchronous. Metadata for the topic partition // layout in the cluster is available in the controller before being synced - // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" + // with the other brokers, which causes "KafkaError:[3] Unknown Topic Or Partition" // when sending requests to the partition leaders. // // This loop will wait up to 2 seconds polling the cluster until no errors diff --git a/transport.go b/transport.go index 685bdddb1..cd4aec478 100644 --- a/transport.go +++ b/transport.go @@ -1200,7 +1200,7 @@ func (g *connGroup) connect(ctx context.Context, addr net.Addr) (*conn, error) { ver := make(map[protocol.ApiKey]int16, len(res.ApiKeys)) if res.ErrorCode != 0 { - return nil, fmt.Errorf("negotating API versions with kafka broker at %s: %w", g.addr, Error(res.ErrorCode)) + return nil, fmt.Errorf("negotating API versions with kafka broker at %s: %w", g.addr, KafkaError(res.ErrorCode)) } for _, r := range res.ApiKeys { @@ -1337,7 +1337,7 @@ func saslHandshakeRoundTrip(pc *protocol.Conn, mechanism string) error { } res := msg.(*saslhandshake.Response) if res.ErrorCode != 0 { - err = Error(res.ErrorCode) + err = KafkaError(res.ErrorCode) } return err } diff --git a/writer.go b/writer.go index a0cf073ac..9344ea2e3 100644 --- a/writer.go +++ b/writer.go @@ -760,7 +760,7 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { if t.Name == topic { // This should always hit, unless kafka has a bug. if t.ErrorCode != 0 { - return 0, Error(t.ErrorCode) + return 0, KafkaError(t.ErrorCode) } return len(t.Partitions), nil }