Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-Mysql] Set default cursor value for cdc mode #28756

Merged
merged 14 commits into from
Aug 17, 2023

Conversation

nguyenaiden
Copy link
Contributor

@nguyenaiden nguyenaiden commented Jul 26, 2023

What

27340
To prepare for Destination v2, Cdc-enabled DB sources need to have a pre-defined cursor for normalization to operate properly.

How

image
During the discover phase, source_defined_cursor is set to true, and a new composite cursor column called _ab_cdc_cursor is selected as the predefined cursor. This column is constructed from:

  1. Converting the emittedAt timestamp , generated at the read step, to epoch seconds.
  2. Convert that to nanoseconds.
  3. Initialize a threadsafe recordCounter = 1
  4. Add the recordCounter to the converted timestamp
  5. Increment the counter

The numeric cursor value will be computed for every record and allows for fast comparison in tie-breaking scenarios for records with the same Primary Key.

Updated Streamstate
[
    {
      "streamDescriptor": {
        "name": "users",
        "namespace": "1gb"
      },
      "streamState": {
        "stream_name": "users",
        "cursor_field": [
          "_ab_cdc_cursor"
        ],
        "stream_namespace": "1gb"
      }
    }
  ]

Recommended reading order

  1. MySqlSource.java
  2. MySqlCdcConnectorMetadataInjector.java
  3. All the test files

🚨 User Impact 🚨

New CDC syncs that have refreshed their source schema will see this cursor field be chosen.
This update requires that users do a full reset OR drop their SCD tables to ensure that syncs continue operating normally
Customers who reset their data will continue to see old behavior until their source goes through the discover() phase again.

@github-actions
Copy link
Contributor

github-actions bot commented Jul 26, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@nguyenaiden nguyenaiden marked this pull request as ready for review July 26, 2023 21:36
@nguyenaiden nguyenaiden requested a review from a team as a code owner July 26, 2023 21:36
@nguyenaiden nguyenaiden changed the title onstruct cdc default cursor for mysql [Source-Mysql] Set default cursor value for cdc mode Jul 26, 2023
@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 9256d31864) - ❌

⏲️ Total pipeline duration: 21mn10s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit 9256d31864) - ❌

⏲️ Total pipeline duration: 13mn46s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit 9256d31864) - ❌

⏲️ Total pipeline duration: 13mn47s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 9256d31864) - ❌

⏲️ Total pipeline duration: 18mn13s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

Copy link
Contributor

@subodh1810 subodh1810 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets carry out similar tests that we did for PostgresSource and highlight what's going to break and how we plan to migrate.


public class MySqlCdcConnectorMetadataInjector implements CdcMetadataInjector {

final Instant emittedAt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be private

@@ -147,6 +158,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
properties.set(CDC_LOG_POS, numberType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);
properties.set(CDC_DEFAULT_CURSOR, stringType);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@subodh1810 Was this where you were referring to?

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit 319a2e9ec3) - ✅

⏲️ Total pipeline duration: 02mn27s

Step Result

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 319a2e9ec3) - ❌

⏲️ Total pipeline duration: 02mn34s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit f9a9dd0dc6) - ✅

⏲️ Total pipeline duration: 02mn36s

Step Result

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit f9a9dd0dc6) - ❌

⏲️ Total pipeline duration: 02mn26s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had a question, mostly just not familiar with what emitted_at / file / pos represent.

(still haven't actually tested this, does db sources have a test mysql instance that's already set up for cdc?)

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String cdcDefaultCursor =
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are all three of these values guaranteed to stringify to the same length? e.g. is it possible to end up with two records with cursors

123_x_y
4_x_y

where the second one should sort lower (because 4 < 123), but we actually sort it higher (because '4' > '1')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to verifying what Ed said.

One other thing that I noticed while testing out CDC via PK is that the timestamp looks a little different based on whether debezium or Airbyte is populating the metadata fields.

emitted_at for records emitted by Airbyte : 2023-07-27T17:14:38Z
emitted_at for records emitted by Debezium : 2023-07-27T17:14:32.043504Z

Of course - for this PR only Debezium is ever emitting records. Can we confirm that no matter the format, if one timestamp is before the other the cursor order will be respected? If not, we should file a ticket to make sure all records (whether emitted by Airbyte in the PK initial load sync, or DBZ on incremental runs) have the same ts structure

Copy link
Contributor Author

@nguyenaiden nguyenaiden Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao, just verified with 2 different syncs and this is what the composite cursor looks like for those.

2023-08-01T20:54:18.167934430Z_mysql-bin.000108_4743280
2023-08-01T21:45:42.794296969Z_mysql-bin.000108_4930394

The timestamps all have 9 trailing digits. This is most likely the Debezium emitted at you mentioned @akashkulk. If they're all the same length, I believe we're good to go. See screenshots below:
image
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion here. Got confused between emitted_at and updated_at. This is all populated by a timestamp created by the Airbyte source connector at the start of the sync, so it should be the same format throughout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do file + pos also guarantee same string length across records? (this is super edgecasey)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not remove the log position. You could have 2 events (INSERT, UPDATE) to the same primary key in the same log file. And Airbyte might see both these two records in the same sync giving them the same emittedAt value. Thus the cursor for these two records would be exact same. That's why we need the position.

But I am not fully convinced about the scenario where the length of the cursor doesnt match. Will all the destinations behave the same if the cursor is of different length? How would Snowflake and Bigquery behave? Will the sort order be correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think of this, the more this gets dependent on the destination's sort order. And I dont want to be in a situation where two destinations behave differently.
What if we dont use the file and log position and just use a monotonically increasing value like a number. We take the emittedAt value, convert it to epoch seconds or milliseconds and for each record that we see, we keep adding 1 to the original value and use that value as a cursor. Since debezium guarantees that the order of records that it returns to be correct, this will always increase in the right way. Also for records read during snapshot we should make sure that the cursor value for all records stay the same. The addition only happens for records read from binlogs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do others think of this idea?^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also save us in a scenario where the customer changes the binlog file name format between syncs and would be more durable behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented!

Copy link
Contributor

@akashkulk akashkulk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick comment on testing cursor sortability

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 6d6a24ddfa) - ❌

⏲️ Total pipeline duration: 18mn46s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit 6d6a24ddfa) - ❌

⏲️ Total pipeline duration: 13mn38s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

Copy link
Contributor

@subodh1810 subodh1810 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to settle on the right cursor value. The fact that the length of the cursor will not stay exact same makes me a bit uncomfortable cause now a lot depends on the sort order of the destination. I have proposed a different idea, please take a look at it.

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String cdcDefaultCursor =
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not remove the log position. You could have 2 events (INSERT, UPDATE) to the same primary key in the same log file. And Airbyte might see both these two records in the same sync giving them the same emittedAt value. Thus the cursor for these two records would be exact same. That's why we need the position.

But I am not fully convinced about the scenario where the length of the cursor doesnt match. Will all the destinations behave the same if the cursor is of different length? How would Snowflake and Bigquery behave? Will the sort order be correct?

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String cdcDefaultCursor =
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think of this, the more this gets dependent on the destination's sort order. And I dont want to be in a situation where two destinations behave differently.
What if we dont use the file and log position and just use a monotonically increasing value like a number. We take the emittedAt value, convert it to epoch seconds or milliseconds and for each record that we see, we keep adding 1 to the original value and use that value as a cursor. Since debezium guarantees that the order of records that it returns to be correct, this will always increase in the right way. Also for records read during snapshot we should make sure that the cursor value for all records stay the same. The addition only happens for records read from binlogs.

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String cdcDefaultCursor =
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do others think of this idea?^

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String cdcDefaultCursor =
String.format("%s_%s_%s", emittedAt.toString(), source.get("file").asText(), source.get("pos").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also save us in a scenario where the customer changes the binlog file name format between syncs and would be more durable behaviour.

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 7d9ac53601) - ❌

⏲️ Total pipeline duration: 11.27s

Step Result

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@nguyenaiden
Copy link
Contributor Author

nguyenaiden commented Aug 17, 2023

/legacy-test connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/5893901824
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/5893901824
No Python unittests run

Build Passed

Test summary info:

=========================== short test summary info ============================
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestConnection.test_check: not found in the config.
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestDiscovery.test_discover: not found in the config.
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestBasicRead.test_read: not found in the config.
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestFullRefresh.test_sequential_reads: not found in the config.
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: not found in the config.
SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:103: The previous and actual specifications are identical.
============ 23 passed, 6 skipped, 33 warnings in 261.45s (0:04:21) ============

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit e2417f7d62) - ❌

⏲️ Total pipeline duration: 19mn54s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit e2417f7d62) - ❌

⏲️ Total pipeline duration: 12mn54s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@nguyenaiden
Copy link
Contributor Author

nguyenaiden commented Aug 17, 2023

/legacy-test connector=connectors/source-mysql-strict-encrypt

🕑 connectors/source-mysql-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/5894245062
❌ connectors/source-mysql-strict-encrypt https://github.com/airbytehq/airbyte/actions/runs/5894245062
🐛

Build Passed

Test summary info:

All Passed

@octavia-squidington-iii
Copy link
Collaborator

source-mysql-strict-encrypt test report (commit 12a6b9976e) - ❌

⏲️ Total pipeline duration: 14mn46s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@nguyenaiden
Copy link
Contributor Author

/approve-and-merge reason="Merging since all tests passed and CI is flaky"

@octavia-approvington
Copy link
Contributor

This is really good
simply the best

@octavia-approvington octavia-approvington merged commit 6ee426a into master Aug 17, 2023
17 checks passed
@octavia-approvington octavia-approvington deleted the mysql-cdc-cursor branch August 17, 2023 18:13
@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 12a6b9976e) - ❌

⏲️ Total pipeline duration: 18mn33s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants