-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(block-builder): update block builder to use kafka clients directly #15433
base: main
Are you sure you want to change the base?
Conversation
// No more records available | ||
break | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this continue instead of break? And do we want to reset the backoff after a successful poll first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unlikely, but if we are already caught-up to the latest produced offset this should allow a few retries before failing. I don't have a strong opinion about this change
} | ||
|
||
// Reset backoff on successful poll | ||
boff.Reset() | ||
|
||
converted := make([]AppendInput, 0, len(records)) | ||
for _, record := range records { | ||
converted := make([]AppendInput, 0, fs.NumRecords()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparing to the existing partition reader, would it be useful to add some metrics on number of records processed etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely! i'll take that in a follow-up :)
What this PR does / why we need it:
Currently block builders create ephemeral partition reader for processing a new job, but this fails to work as expected because of duplicate kprom metric registration that is triggered on new client creation. We could get around this by wrapping the registerer with
partition_id
andworker_id
labels and carefully deregistering them when the job processing is complete.But at this point of time, I prefer directly using the kafka clients (one per each block builder worker) which offers better control over addition/removal of partitions to consume. We can look into partition reader refactor down the line if it's neccessary.
This pr updates block builder to use direct kafka clients, one client per worker. We add partition consume offset when we start a job and remove it at the end of the job to allow reusing the same client.
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR