Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hstream-kafka: upgrade max api version of produce to 2 #1601

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ jobs:
ghc: ${{ fromJson(needs.pre-build.outputs.ghc) }}

steps:
- name: Free disk space
run: |
echo "Before..."
df -h
sudo rm -rf \
/usr/share/dotnet /usr/local/lib/android /opt/ghc \
/usr/local/share/powershell /usr/share/swift /usr/local/.ghcup \
"/usr/local/share/boost" \
/usr/lib/jvm || true
echo "After..."
df -h

- name: retrieve saved docker image
uses: actions/download-artifact@v3
with:
Expand Down Expand Up @@ -329,6 +341,18 @@ jobs:
ghc: ${{ fromJson(needs.pre-build.outputs.ghc) }}

steps:
- name: Free disk space
run: |
echo "Before..."
df -h
sudo rm -rf \
/usr/share/dotnet /usr/local/lib/android /opt/ghc \
/usr/local/share/powershell /usr/share/swift /usr/local/.ghcup \
"/usr/local/share/boost" \
/usr/lib/jvm || true
echo "After..."
df -h

- name: retrieve saved docker image
uses: actions/download-artifact@v3
with:
Expand Down
79 changes: 76 additions & 3 deletions common/kafka/src/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ data TopicProduceDataV0 = TopicProduceDataV0
} deriving (Show, Generic)
instance Serializable TopicProduceDataV0

type PartitionProduceDataV1 = PartitionProduceDataV0

type TopicProduceDataV1 = TopicProduceDataV0

type PartitionProduceDataV2 = PartitionProduceDataV0

type TopicProduceDataV2 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand All @@ -375,6 +383,33 @@ data TopicProduceResponseV0 = TopicProduceResponseV0
} deriving (Show, Generic)
instance Serializable TopicProduceResponseV0

type PartitionProduceResponseV1 = PartitionProduceResponseV0

type TopicProduceResponseV1 = TopicProduceResponseV0

data PartitionProduceResponseV2 = PartitionProduceResponseV2
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, baseOffset :: {-# UNPACK #-} !Int64
-- ^ The base offset.
, logAppendTimeMs :: {-# UNPACK #-} !Int64
-- ^ The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
} deriving (Show, Generic)
instance Serializable PartitionProduceResponseV2

data TopicProduceResponseV2 = TopicProduceResponseV2
{ name :: !Text
-- ^ The topic name
, partitionResponses :: !(KaArray PartitionProduceResponseV2)
-- ^ Each partition that we produced to within the topic.
} deriving (Show, Generic)
instance Serializable TopicProduceResponseV2

data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0
{ memberId :: !Text
-- ^ The ID of the member to assign.
Expand Down Expand Up @@ -635,11 +670,33 @@ data ProduceRequestV0 = ProduceRequestV0
} deriving (Show, Generic)
instance Serializable ProduceRequestV0

type ProduceRequestV1 = ProduceRequestV0

type ProduceRequestV2 = ProduceRequestV0

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Generic)
instance Serializable ProduceResponseV0

data ProduceResponseV1 = ProduceResponseV1
{ responses :: !(KaArray TopicProduceResponseV0)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Generic)
instance Serializable ProduceResponseV1

data ProduceResponseV2 = ProduceResponseV2
{ responses :: !(KaArray TopicProduceResponseV2)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Generic)
instance Serializable ProduceResponseV2

data SyncGroupRequestV0 = SyncGroupRequestV0
{ groupId :: !Text
-- ^ The unique group identifier.
Expand Down Expand Up @@ -802,10 +859,18 @@ data HStreamKafkaV1
instance Service HStreamKafkaV1 where
type ServiceName HStreamKafkaV1 = "HStreamKafkaV1"
type ServiceMethods HStreamKafkaV1 =
'[ "metadata"
'[ "produce"
, "metadata"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV1 "produce" where
type MethodName HStreamKafkaV1 "produce" = "produce"
type MethodKey HStreamKafkaV1 "produce" = 0
type MethodVersion HStreamKafkaV1 "produce" = 1
type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1
type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1

instance HasMethodImpl HStreamKafkaV1 "metadata" where
type MethodName HStreamKafkaV1 "metadata" = "metadata"
type MethodKey HStreamKafkaV1 "metadata" = 3
Expand All @@ -825,9 +890,17 @@ data HStreamKafkaV2
instance Service HStreamKafkaV2 where
type ServiceName HStreamKafkaV2 = "HStreamKafkaV2"
type ServiceMethods HStreamKafkaV2 =
'[ "apiVersions"
'[ "produce"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV2 "produce" where
type MethodName HStreamKafkaV2 "produce" = "produce"
type MethodKey HStreamKafkaV2 "produce" = 0
type MethodVersion HStreamKafkaV2 "produce" = 2
type MethodInput HStreamKafkaV2 "produce" = ProduceRequestV2
type MethodOutput HStreamKafkaV2 "produce" = ProduceResponseV2

instance HasMethodImpl HStreamKafkaV2 "apiVersions" where
type MethodName HStreamKafkaV2 "apiVersions" = "apiVersions"
type MethodKey HStreamKafkaV2 "apiVersions" = 18
Expand Down Expand Up @@ -861,7 +934,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 0
[ ApiVersionV0 (ApiKey 0) 0 2
, ApiVersionV0 (ApiKey 1) 0 0
, ApiVersionV0 (ApiKey 2) 0 0
, ApiVersionV0 (ApiKey 3) 0 1
Expand Down
2 changes: 1 addition & 1 deletion script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
API_VERSION_PATCHES = {
"ApiVersions": (0, 2),
"Metadata": (0, 1),
"Produce": (0, 0),
"Produce": (0, 2),
}

# Variables
Expand Down
Loading