Skip to content

Latest commit

 

History

History
219 lines (148 loc) · 7.17 KB

File metadata and controls

219 lines (148 loc) · 7.17 KB

IoTDB

IoTDB sink connector

Description

Used to write data to IoTDB.

:::tip

There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it.

:::

Key features

IoTDB supports the exactly-once feature through idempotent writing. If two pieces of data have the same key and timestamp, the new data will overwrite the old one.

Options

name type required default value
node_urls list yes -
username string yes -
password string yes -
key_device string yes -
key_timestamp string no processing time
key_measurement_fields array no exclude device & timestamp
storage_group string no -
batch_size int no 1024
batch_interval_ms int no -
max_retries int no -
retry_backoff_multiplier_ms int no -
max_retry_backoff_ms int no -
default_thrift_buffer_size int no -
max_thrift_frame_size int no -
zone_id string no -
enable_rpc_compression boolean no -
connection_timeout_in_ms int no -
common-options no -

node_urls [list]

IoTDB cluster address, the format is ["host:port", ...]

username [string]

IoTDB user username

password [string]

IoTDB user password

key_device [string]

Specify field name of the IoTDB deviceId in SeaTunnelRow

key_timestamp [string]

Specify field-name of the IoTDB timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

key_measurement_fields [array]

Specify field-name of the IoTDB measurement list in SeaTunnelRow. If not specified, include all fields but exclude device & timestamp

storage_group [string]

Specify device storage group(path prefix)

example: deviceId = ${storage_group} + "." + ${key_device}

batch_size [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB

batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB

max_retries [int]

The number of retries to flush failed

retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to IoTDB

default_thrift_buffer_size [int]

Thrift init buffer size in IoTDB client

max_thrift_frame_size [int]

Thrift max frame size in IoTDB client

zone_id [string]

java.time.ZoneId in IoTDB client

enable_rpc_compression [boolean]

Enable rpc compression in IoTDB client

connection_timeout_in_ms [int]

The maximum time (in ms) to wait when connecting to IoTDB

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Examples

Case1

Common options:

sink {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    batch_size = 1024
    batch_interval_ms = 1000
  }
}

When you assign key_device is device_name, for example:

sink {
  IoTDB {
    ...
    key_device = "device_name"
  }
}

Upstream SeaTunnelRow data format is the following:

device_name field_1 field_2
root.test_group.device_a 1001 1002
root.test_group.device_b 2001 2002
root.test_group.device_c 3001 3002

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
|                    Time|                  Device|   field_1|    field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a|      1001|       1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b|      2001|       2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c|      3001|       3002|
+------------------------+------------------------+----------+-----------+

Case2

When you assign key_devicekey_timestampkey_measurement_fields, for example:

sink {
  IoTDB {
    ...
    key_device = "device_name"
    key_timestamp = "ts"
    key_measurement_fields = ["temperature", "moisture"]
  }
}

Upstream SeaTunnelRow data format is the following:

ts device_name field_1 field_2 temperature moisture
1664035200001 root.test_group.device_a 1001 1002 36.1 100
1664035200001 root.test_group.device_b 2001 2002 36.2 101
1664035200001 root.test_group.device_c 3001 3002 36.3 102

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
|                    Time|                  Device|   temperature|   moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|
+------------------------+------------------------+--------------+-----------+

Changelog

2.2.0-beta 2022-09-26

  • Add IoTDB Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Improve IoTDB Sink Connector (2917)
    • Support align by sql syntax
    • Support sql split ignore case
    • Support restore split offset to at-least-once
    • Support read timestamp from RowRecord
  • [BugFix] Fix IoTDB connector sink NPE (3080)