-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Source-Mysql] Set default cursor value for cdc mode #28756
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
source-mysql test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
source-mysql test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets carry out similar tests that we did for PostgresSource and highlight what's going to break and how we plan to migrate.
|
||
public class MySqlCdcConnectorMetadataInjector implements CdcMetadataInjector { | ||
|
||
final Instant emittedAt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private
.../connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java
Show resolved
Hide resolved
@@ -147,6 +158,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { | |||
properties.set(CDC_LOG_POS, numberType); | |||
properties.set(CDC_UPDATED_AT, stringType); | |||
properties.set(CDC_DELETED_AT, stringType); | |||
properties.set(CDC_DEFAULT_CURSOR, stringType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@subodh1810 Was this where you were referring to?
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
source-mysql test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ❌ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
319a2e9
to
f9a9dd0
Compare
source-mysql test report (commit
|
Step | Result |
---|
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ❌ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had a question, mostly just not familiar with what emitted_at / file / pos represent.
(still haven't actually tested this, does db sources have a test mysql instance that's already set up for cdc?)
@Override | ||
public void addMetaData(final ObjectNode event, final JsonNode source) { | ||
final String cdcDefaultCursor = | ||
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are all three of these values guaranteed to stringify to the same length? e.g. is it possible to end up with two records with cursors
123_x_y
4_x_y
where the second one should sort lower (because 4 < 123), but we actually sort it higher (because '4'
> '1'
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to verifying what Ed said.
One other thing that I noticed while testing out CDC via PK is that the timestamp looks a little different based on whether debezium or Airbyte is populating the metadata fields.
emitted_at for records emitted by Airbyte : 2023-07-27T17:14:38Z
emitted_at for records emitted by Debezium : 2023-07-27T17:14:32.043504Z
Of course - for this PR only Debezium is ever emitting records. Can we confirm that no matter the format, if one timestamp is before the other the cursor order will be respected? If not, we should file a ticket to make sure all records (whether emitted by Airbyte in the PK initial load sync, or DBZ on incremental runs) have the same ts structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao, just verified with 2 different syncs and this is what the composite cursor looks like for those.
2023-08-01T20:54:18.167934430Z_mysql-bin.000108_4743280
2023-08-01T21:45:42.794296969Z_mysql-bin.000108_4930394
The timestamps all have 9 trailing digits. This is most likely the Debezium emitted at you mentioned @akashkulk. If they're all the same length, I believe we're good to go. See screenshots below:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion here. Got confused between emitted_at and updated_at. This is all populated by a timestamp created by the Airbyte source connector at the start of the sync, so it should be the same format throughout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do file + pos also guarantee same string length across records? (this is super edgecasey)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not remove the log position. You could have 2 events (INSERT, UPDATE) to the same primary key in the same log file. And Airbyte might see both these two records in the same sync giving them the same emittedAt value. Thus the cursor for these two records would be exact same. That's why we need the position.
But I am not fully convinced about the scenario where the length of the cursor doesnt match. Will all the destinations behave the same if the cursor is of different length? How would Snowflake and Bigquery behave? Will the sort order be correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think of this, the more this gets dependent on the destination's sort order. And I dont want to be in a situation where two destinations behave differently.
What if we dont use the file and log position and just use a monotonically increasing value like a number. We take the emittedAt
value, convert it to epoch seconds or milliseconds and for each record that we see, we keep adding 1 to the original value and use that value as a cursor. Since debezium guarantees that the order of records that it returns to be correct, this will always increase in the right way. Also for records read during snapshot we should make sure that the cursor value for all records stay the same. The addition only happens for records read from binlogs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do others think of this idea?^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would also save us in a scenario where the customer changes the binlog file name format between syncs and would be more durable behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick comment on testing cursor sortability
source-mysql test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to settle on the right cursor value. The fact that the length of the cursor will not stay exact same makes me a bit uncomfortable cause now a lot depends on the sort order of the destination. I have proposed a different idea, please take a look at it.
@Override | ||
public void addMetaData(final ObjectNode event, final JsonNode source) { | ||
final String cdcDefaultCursor = | ||
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not remove the log position. You could have 2 events (INSERT, UPDATE) to the same primary key in the same log file. And Airbyte might see both these two records in the same sync giving them the same emittedAt value. Thus the cursor for these two records would be exact same. That's why we need the position.
But I am not fully convinced about the scenario where the length of the cursor doesnt match. Will all the destinations behave the same if the cursor is of different length? How would Snowflake and Bigquery behave? Will the sort order be correct?
@Override | ||
public void addMetaData(final ObjectNode event, final JsonNode source) { | ||
final String cdcDefaultCursor = | ||
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think of this, the more this gets dependent on the destination's sort order. And I dont want to be in a situation where two destinations behave differently.
What if we dont use the file and log position and just use a monotonically increasing value like a number. We take the emittedAt
value, convert it to epoch seconds or milliseconds and for each record that we see, we keep adding 1 to the original value and use that value as a cursor. Since debezium guarantees that the order of records that it returns to be correct, this will always increase in the right way. Also for records read during snapshot we should make sure that the cursor value for all records stay the same. The addition only happens for records read from binlogs.
@Override | ||
public void addMetaData(final ObjectNode event, final JsonNode source) { | ||
final String cdcDefaultCursor = | ||
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do others think of this idea?^
@Override | ||
public void addMetaData(final ObjectNode event, final JsonNode source) { | ||
final String cdcDefaultCursor = | ||
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would also save us in a scenario where the customer changes the binlog file name format between syncs and would be more durable behaviour.
6d6a24d
to
7d9ac53
Compare
source-mysql test report (commit
|
Step | Result |
---|
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
cf26802
to
e2417f7
Compare
/legacy-test connector=connectors/source-mysql
Build PassedTest summary info:
|
Use 100M instead of 1B
updated name use the same instance of injector so there won't collision
fa2b876
to
12a6b99
Compare
source-mysql test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ❌ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ❌ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
/legacy-test connector=connectors/source-mysql-strict-encrypt
Build PassedTest summary info:
|
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ❌ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ❌ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
/approve-and-merge reason="Merging since all tests passed and CI is flaky" |
source-mysql test report (commit
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ❌ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
What
27340
To prepare for Destination v2, Cdc-enabled DB sources need to have a pre-defined cursor for normalization to operate properly.
How
During the discover phase,
source_defined_cursor
is set totrue
, and a new composite cursor column called_ab_cdc_cursor
is selected as the predefined cursor. This column is constructed from:The numeric cursor value will be computed for every record and allows for fast comparison in tie-breaking scenarios for records with the same Primary Key.
Recommended reading order
MySqlSource.java
MySqlCdcConnectorMetadataInjector.java
🚨 User Impact 🚨
New CDC syncs that have refreshed their source schema will see this cursor field be chosen.
This update requires that users do a full reset OR drop their SCD tables to ensure that syncs continue operating normally
Customers who reset their data will continue to see old behavior until their source goes through the
discover()
phase again.