Skip to content

Commit

Permalink
[Improve][Connector-V2][Doris] Refactor some Doris Sink code as well …
Browse files Browse the repository at this point in the history
…as support 2pc and cdc (apache#4235)

* [Improve][Connector-V2][Doris]Refactor some Doris Sink code and support 2pc

* fix package

* code style

* doc and test

* doc style

* Add cdc and override Serializer

* fix

* add test and fix code and fix doc

* add doc

* fix test

* fix test2

* fix test4

* code style

* fix test5

* add abort

* fix review

* add release-note

* fix

* fix

* fix

* fix
  • Loading branch information
zy-kkk authored Mar 8, 2023
1 parent 8f11b77 commit 7c4005a
Show file tree
Hide file tree
Showing 51 changed files with 3,950 additions and 1,193 deletions.
8 changes: 5 additions & 3 deletions docs/en/connector-v2/Error-Quick-Reference-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ problems encountered by users.

## Doris Connector Error Codes

| code | description | solution |
|----------|----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|
| Doris-01 | Writing records to Doris failed. | When users encounter this error code, it means that writing records to Doris failed, please check data from files whether is correct |
| code | description | solution |
|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------|
| Doris-01 | stream load error. | When users encounter this error code, it means that stream load to Doris failed, please check data from files whether is correct. |
| Doris-02 | commit error. | When users encounter this error code, it means that commit to Doris failed, please check network. |
| Doris-03 | rest service error. | When users encounter this error code, it means that rest service failed, please check network and config. |

## SelectDB Cloud Connector Error Codes

Expand Down
113 changes: 53 additions & 60 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,37 @@
Used to send data to Doris. Both support streaming and batch mode.
The internal implementation of Doris sink connector is cached and imported by stream load in batches.

:::tip

Version Supported

* exactly-once & cdc supported `Doris version is >= 1.1.x`
* Array data type supported `Doris version is >= 1.2.x`
* Map data type will be support in `Doris version is 2.x`

:::

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|--------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | 1000 |
| max_retries | int | no | 1 |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| doris.config | map | no | - |

### node_urls [list]

`Doris` cluster address, the format is `["fe_ip:fe_http_port", ...]`
| name | type | required | default value |
|--------------------|--------|----------|---------------|
| fenodes | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| table.identifier | string | yes | - |
| sink.label-prefix | string | yes | - |
| sink.enable-2pc | bool | no | true |
| sink.enable-delete | bool | no | false |
| doris.config | map | yes | - |

### fenodes [string]

`Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."`

### username [string]

Expand All @@ -41,47 +47,29 @@ The internal implementation of Doris sink connector is cached and imported by st

`Doris` user password

### database [string]

The name of `Doris` database

### table [string]
### table.identifier [string]

The name of `Doris` table

### labelPrefix [string]

The prefix of `Doris` stream load label
### sink.label-prefix [string]

### batch_max_rows [long]
The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
### sink.enable-2pc [bool]

### batch_max_bytes [int]
Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
### sink.enable-delete [bool]

### batch_interval_ms [int]
Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris

### max_retries [int]

The number of retries to flush failed

### retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to `Doris`
https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual

### doris.config [map]

The parameter of the stream load `data_desc`, you can get more detail at this link:

https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/
https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD

#### Supported import data formats

Expand All @@ -94,15 +82,15 @@ Use JSON format to import data
```
sink {
Doris {
nodeUrls = ["e2e_dorisdb:8030"]
fenodes = ["e2e_dorisdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
table.identifier = "test.e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format = "JSON"
strip_outer_array = true
format="json"
read_json_by_line="true"
}
}
}
Expand All @@ -114,16 +102,14 @@ Use CSV format to import data
```
sink {
Doris {
nodeUrls = ["e2e_dorisdb:8030"]
fenodes = ["e2e_dorisdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
sink.properties.format = "CSV"
sink.properties.column_separator = ","
table.identifier = "test.e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "CSV"
format = "csv"
column_separator = ","
}
}
Expand All @@ -140,3 +126,10 @@ sink {

- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)

- [Improve] Refactor some Doris Sink code as well as support 2pc and cdc [4235](https://github.com/apache/incubator-seatunnel/pull/4235)

:::tip

PR 4235 is an incompatible modification to PR 3856. Please refer to PR 4235 to use the new Doris connector

:::
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- [File] Support column projection #4105
- [Github] Add github source connector #4155
- [Jdbc] Add database field to sink config #4199
- [Doris] Refactor some Doris Sink code as well as support 2pc and cdc #4235
### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-doris/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,10 @@
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
</dependencies>
</project>

This file was deleted.

Loading

0 comments on commit 7c4005a

Please sign in to comment.