Skip to content

Commit

Permalink
[Feature][scaleph-workspace-seatunnel] upgrade seatunnel cassandra, o…
Browse files Browse the repository at this point in the history
…racle-cdc connectors to 2.3.7 (#753)

* feature: update prometheus operator 部署

* feature: upgrade seatunnel to 2.3.6

* feature: upgrade seatunnel cassandra connector to 2.3.7

* feature: upgrade seatunnel oracle-cdc connector to 2.3.7

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Aug 29, 2024
1 parent 7e6b06e commit ad7f1ae
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public enum SeaTunnelPluginMapping {
SOURCE_HTTP(SEATUNNEL, SOURCE, HTTP, "connector-http-base", BETA, BATCH, STREAM, SCHEMA_PROJECTION),
SINK_HTTP(SEATUNNEL, SINK, HTTP, "connector-http-base", BETA),
SINK_FEISHU(SEATUNNEL, SINK, FEISHU, "connector-http-feishu", ALPHA),
@Deprecated
SINK_WECHAT(SEATUNNEL, SINK, WECHAT, "connector-http-wechat", ALPHA),
SINK_DINGTALK(SEATUNNEL, SINK, DINGTALK, "connector-dingtalk", ALPHA),
SOURCE_MYHOURS(SEATUNNEL, SOURCE, MYHOURS, "connector-http-myhours", ALPHA, BATCH, SCHEMA_PROJECTION),
Expand Down Expand Up @@ -86,6 +87,7 @@ public enum SeaTunnelPluginMapping {
SINK_AMAZON_SQS(SEATUNNEL, SINK, AMAZON_SQS, "connector-amazonsqs", UNKNOWN),
SOURCE_RABBITMQ(SEATUNNEL, SOURCE, RABBITMQ, "connector-rabbitmq", BETA, STREAM, EXACTLY_ONCE, SCHEMA_PROJECTION),
SINK_RABBITMQ(SEATUNNEL, SINK, RABBITMQ, "connector-rabbitmq", BETA),
SINK_ACTIVEMQ(SEATUNNEL, SINK, ACTIVEMQ, "connector-activemq", BETA),

SOURCE_JDBC(SEATUNNEL, SOURCE, JDBC, "connector-jdbc", GA, BATCH, SCHEMA_PROJECTION, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SINK_JDBC(SEATUNNEL, SINK, JDBC, "connector-jdbc", GA, EXACTLY_ONCE, CDC),
Expand All @@ -101,13 +103,15 @@ public enum SeaTunnelPluginMapping {
SINK_AMAZON_DYNAMODB(SEATUNNEL, SINK, AMAZON_DYNAMODB, "connector-amazondynamodb", BETA),
SOURCE_CASSANDRA(SEATUNNEL, SOURCE, CASSANDRA, "connector-cassandra", BETA, BATCH, SCHEMA_PROJECTION),
SINK_CASSANDRA(SEATUNNEL, SINK, CASSANDRA, "connector-cassandra", BETA),
SOURCE_TABLESTORE(SEATUNNEL, SOURCE, TABLESTORE, "connector-tablestore", ALPHA),
SINK_TABLESTORE(SEATUNNEL, SINK, TABLESTORE, "connector-tablestore", ALPHA),
SINK_GOOGLE_FIRE_STORE(SEATUNNEL, SINK, GOOGLE_FIRE_STORE, "connector-google-firestore", UNKNOWN),

SOURCE_MYSQL_CDC(SEATUNNEL, SOURCE, MYSQL_CDC, "connector-cdc-mysql", GA, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SOURCE_SQLSERVER_CDC(SEATUNNEL, SOURCE, SQLSERVER_CDC, "connector-cdc-sqlserver", GA, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SOURCE_ORACLE_CDC(SEATUNNEL, SOURCE, ORACLE_CDC, "connector-cdc-oracle", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SOURCE_POSTGRESQL_CDC(SEATUNNEL, SOURCE, POSTGRESQL_CDC, "connector-cdc-postgres", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SOURCE_OPENGAUSS_CDC(SEATUNNEL, SOURCE, OPENGAUSS_CDC, "connector-cdc-opengauss", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SOURCE_MONGODB_CDC(SEATUNNEL, SOURCE, MONGODB_CDC, "connector-cdc-mongodb", UNKNOWN, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),

SOURCE_HIVE(SEATUNNEL, SOURCE, HIVE, "connector-hive", GA, BATCH, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM),
Expand All @@ -128,9 +132,11 @@ public enum SeaTunnelPluginMapping {
SINK_S3REDSHIFT(SEATUNNEL, SINK, S3REDSHIFT, "connector-s3-redshift", GA, EXACTLY_ONCE),
SOURCE_MAXCOMPUTE(SEATUNNEL, SOURCE, MAXCOMPUTE, "connector-maxcompute", ALPHA, BATCH, PARALLELISM),
SINK_MAXCOMPUTE(SEATUNNEL, SINK, MAXCOMPUTE, "connector-maxcompute", ALPHA),
SOURCE_HBASE(SEATUNNEL, SOURCE, HBASE, "connector-hbase", ALPHA),
SINK_HBASE(SEATUNNEL, SINK, HBASE, "connector-hbase", ALPHA),
SOURCE_KUDU(SEATUNNEL, SOURCE, KUDU, "connector-kudu", BETA, BATCH),
SINK_KUDU(SEATUNNEL, SINK, KUDU, "connector-kudu", BETA),
SINK_DRUID(SEATUNNEL, SINK, DRUID, "connector-druid", BETA),
SOURCE_IOTDB(SEATUNNEL, SOURCE, IOTDB, "connector-iotdb", GA, BATCH, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM),
SINK_IOTDB(SEATUNNEL, SINK, IOTDB, "connector-iotdb", GA, EXACTLY_ONCE),
SOURCE_OPENMLDB(SEATUNNEL, SOURCE, OPENMLDB, "connector-openmldb", BETA, BATCH, STREAM),
Expand All @@ -140,17 +146,25 @@ public enum SeaTunnelPluginMapping {
SINK_INFLUXDB(SEATUNNEL, SINK, INFLUXDB, "connector-influxdb", BETA),
SOURCE_TDENGINE(SEATUNNEL, SOURCE, TDENGINE, "connector-tdengine", BETA),
SINK_TDENGINE(SEATUNNEL, SINK, TDENGINE, "connector-tdengine", BETA),
SOURCE_SLS(SEATUNNEL, SOURCE, SLS, "connector-sls", BETA),

SOURCE_MILVUS(SEATUNNEL, SOURCE, MILVUS, "connector-milvus", UNKNOWN),
SINK_MILVUS(SEATUNNEL, SINK, MILVUS, "connector-milvus", UNKNOWN),
SOURCE_WEB3J(SEATUNNEL, SOURCE, WEB3J, "connector-web3j", UNKNOWN),

SINK_SENTRY(SEATUNNEL, SINK, SENTRY, "connector-sentry", ALPHA),
SOURCE_GOOGLE_SHEETS(SEATUNNEL, SOURCE, GOOGLE_SHEETS, "connector-google-sheets", UNKNOWN, BATCH, SCHEMA_PROJECTION),

TRANSFORM_COPY(SEATUNNEL, TRANSFORM, COPY, "connector-copy", UNKNOWN),
TRANSFORM_FIELD_MAPPER(SEATUNNEL, TRANSFORM, FIELD_MAPPER, "connector-field-mapper", UNKNOWN),
TRANSFORM_FILTER_ROW_KIND(SEATUNNEL, TRANSFORM, FILTER_ROW_KIND, "connector-field-row-kind", UNKNOWN),
TRANSFORM_FILTER(SEATUNNEL, TRANSFORM, FILTER, "connector-filter", UNKNOWN),
TRANSFORM_REPLACE(SEATUNNEL, TRANSFORM, REPLACE, "connector-replace", UNKNOWN),
TRANSFORM_SPLIT(SEATUNNEL, TRANSFORM, SPLIT, "connector-split", UNKNOWN),
TRANSFORM_SQL(SEATUNNEL, TRANSFORM, SQL, "connector-sql", UNKNOWN),
TRANSFORM_COPY(SEATUNNEL, TRANSFORM, COPY, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_FIELD_MAPPER(SEATUNNEL, TRANSFORM, FIELD_MAPPER, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_FILTER_ROW_KIND(SEATUNNEL, TRANSFORM, FILTER_ROW_KIND, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_FILTER(SEATUNNEL, TRANSFORM, FILTER, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_REPLACE(SEATUNNEL, TRANSFORM, REPLACE, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_SPLIT(SEATUNNEL, TRANSFORM, SPLIT, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_SQL(SEATUNNEL, TRANSFORM, SQL, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_JSON_PATH(SEATUNNEL, TRANSFORM, JSON_PATH, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_DYNAMIC_COMPILE(SEATUNNEL, TRANSFORM, DYNAMIC_COMPILE, "seatunnel-transforms-v2", UNKNOWN),
TRANSFORM_LLM(SEATUNNEL, TRANSFORM, LLM, "seatunnel-transforms-v2", UNKNOWN),
;

private SeaTunnelEngineType engineType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public enum SeaTunnelPluginName implements DictInstance {
SLACK("SlackSink", "Slack"),
HTTP("Http", "Http"),
FEISHU("Feishu", "Feishu"),

@Deprecated
WECHAT("WeChat", "WeChat"),

Expand Down Expand Up @@ -83,6 +82,7 @@ public enum SeaTunnelPluginName implements DictInstance {
SQLSERVER_CDC("SqlServer-CDC", "SqlServer-CDC"),
ORACLE_CDC("Oracle-CDC", "Oracle-CDC"),
POSTGRESQL_CDC("Postgres-CDC", "PostgreSQL-CDC"),
OPENGAUSS_CDC("Opengauss-CDC", "Opengauss-CDC"),
MONGODB_CDC("MongoDB-CDC", "MongoDB-CDC"),

HIVE("Hive", "Hive"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
*/
package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.cassandra.source;

import cn.sliew.carp.module.datasource.modal.DataSourceInfo;
import cn.sliew.carp.module.datasource.modal.nosql.CassandraDataSourceProperties;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.nosql.CassandraDataSource;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin;
Expand Down Expand Up @@ -63,17 +64,19 @@ public List<ResourceProperty> getRequiredResources() {
public ObjectNode createConf() {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
CassandraDataSource dataSource = (CassandraDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
conf.putPOJO(HOST.getName(), dataSource.getHost());
conf.putPOJO(KEYSPACE.getName(), dataSource.getKeyspace());
if (StringUtils.hasText(dataSource.getUsername())) {
conf.putPOJO(USERNAME.getName(), dataSource.getUsername());

DataSourceInfo dataSourceInfo = JacksonUtil.toObject(jsonNode, DataSourceInfo.class);
CassandraDataSourceProperties props = (CassandraDataSourceProperties) dataSourceInfo.getProps();
conf.putPOJO(HOST.getName(), props.getHost());
conf.putPOJO(KEYSPACE.getName(), props.getKeyspace());
if (StringUtils.hasText(props.getUsername())) {
conf.putPOJO(USERNAME.getName(), props.getUsername());
}
if (StringUtils.hasText(dataSource.getPassword())) {
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
if (StringUtils.hasText(props.getPassword())) {
conf.putPOJO(PASSWORD.getName(), props.getPassword());
}
if (StringUtils.hasText(dataSource.getDatacenter())) {
conf.putPOJO(DATACENTER.getName(), dataSource.getDatacenter());
if (StringUtils.hasText(props.getDatacenter())) {
conf.putPOJO(DATACENTER.getName(), props.getDatacenter());
}
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public OracleCDCSourcePlugin() {
props.add(PASSWORD);
props.add(DATABASE);
props.add(OracleCDCSourceProperties.SCHEMA);
props.add(OracleCDCSourceProperties.USE_SELECT_COUNT);
props.add(OracleCDCSourceProperties.SKIP_ANALYZE);
props.add(TABLE);
props.add(TABLE_CONFIG);
props.add(STARTUP_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,20 @@ public enum OracleCDCSourceProperties {
.parser(Parsers.STRING_ARRAY_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Boolean> USE_SELECT_COUNT = new PropertyDescriptor.Builder()
.name("use_select_count")
.description("Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table")
.type(PropertyType.BOOLEAN)
.parser(Parsers.BOOLEAN_PARSER)
.addValidator(Validators.BOOLEAN_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Boolean> SKIP_ANALYZE = new PropertyDescriptor.Builder()
.name("skip_analyze")
.description("Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently")
.type(PropertyType.BOOLEAN)
.parser(Parsers.BOOLEAN_PARSER)
.addValidator(Validators.BOOLEAN_VALIDATOR)
.validateAndBuild();
}
8 changes: 6 additions & 2 deletions scaleph-ui-react/src/locales/zh-CN/pages/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,6 @@ export default {
'pages.project.di.step.cdc.password': '密码',
'pages.project.di.step.cdc.databases': '数据库',
'pages.project.di.step.cdc.databases.placeholder': 'db1, db2',
'pages.project.di.step.cdc.schemas': 'Schema',
'pages.project.di.step.cdc.schemas.placeholder': 'DEBEZIUM1, DEBEZIUM2',
'pages.project.di.step.cdc.tables': '表',
'pages.project.di.step.cdc.tables.placeholder': 'table1, table2',
'pages.project.di.step.cdc.tableConfig': '表配置',
Expand Down Expand Up @@ -832,6 +830,12 @@ export default {
'pages.project.di.step.cdc.debeziums.value.placeholder': 'never',
'pages.project.di.step.cdc.format': 'CDC 格式',

// oracle-cdc
'pages.project.di.step.oracle-cdc.schemaNames': 'Schema',
'pages.project.di.step.oracle-cdc.schemaNames.placeholder': 'DEBEZIUM1, DEBEZIUM2',
'pages.project.di.step.oracle-cdc.useSelectCount': '使用 select count()',
'pages.project.di.step.oracle-cdc.skipAnalyze': '跳过表 analysis',

// mongodb-cdc
'pages.project.di.step.mongodb-cdc.hosts': '服务器地址',
'pages.project.di.step.mongodb-cdc.hosts.placeholder': 'localhost:27017,localhost:27018',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,12 @@ export const CDCParams = {
format: 'format',
};

export const OracleCDCParams = {
schemaNames: 'schema-names',
useSelectCount: 'use_select_count',
skipAnalyze: 'skip_analyze',
};

export const MongoDBCDCParams = {
hosts: 'hosts',
username: 'username',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import React, {useEffect} from 'react';
import {Form} from 'antd';
import {InfoCircleOutlined} from "@ant-design/icons";
import {
DrawerForm,
ProFormDigit,
Expand All @@ -15,7 +16,6 @@ import {CassandraParams, STEP_ATTR_TYPE} from '../constant';
import {StepSchemaService} from "../helper";
import DataSourceItem from "../dataSource";
import CommonListItem from "@/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/common/list";
import {InfoCircleOutlined} from "@ant-design/icons";

const SinkCassandraStepForm: React.FC<ModalFormProps<Node>> = ({data, visible, onVisibleChange, onOK}) => {
const intl = getIntl(getLocale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import {getIntl, getLocale} from "@umijs/max";
import {Node, XFlow} from '@antv/xflow';
import {ModalFormProps} from '@/typings';
import {CDCParams, STEP_ATTR_TYPE} from '../constant';
import {CDCParams, OracleCDCParams, STEP_ATTR_TYPE} from '../constant';
import {StepSchemaService} from '../helper';
import {DictDataService} from "@/services/admin/dictData.service";
import {DICT_TYPE} from "@/constants/dictType";
Expand Down Expand Up @@ -83,9 +83,9 @@ const SourceCDCOracleStepForm: React.FC<ModalFormProps<Node>> = ({data, visible,
colProps={{span: 8}}
/>
<ProFormText
name={CDCParams.schemas}
label={intl.formatMessage({id: 'pages.project.di.step.cdc.schemas'})}
placeholder={intl.formatMessage({id: 'pages.project.di.step.cdc.schemas.placeholder'})}
name={OracleCDCParams.schemaNames}
label={intl.formatMessage({id: 'pages.project.di.step.oracle-cdc.schemaNames'})}
placeholder={intl.formatMessage({id: 'pages.project.di.step.oracle-cdc.schemaNames.placeholder'})}
colProps={{span: 8}}
/>
<ProFormText
Expand Down Expand Up @@ -298,6 +298,16 @@ const SourceCDCOracleStepForm: React.FC<ModalFormProps<Node>> = ({data, visible,
return DictDataService.listDictDataByType2(DICT_TYPE.seatunnelCDCFormat)
}}
/>
<ProFormSwitch
name={OracleCDCParams.useSelectCount}
label={intl.formatMessage({id: 'pages.project.di.step.oracle-cdc.useSelectCount'})}
colProps={{span: 12}}
/>
<ProFormSwitch
name={OracleCDCParams.skipAnalyze}
label={intl.formatMessage({id: 'pages.project.di.step.oracle-cdc.skipAnalyze'})}
colProps={{span: 12}}
/>
</DrawerForm>
</XFlow>
);
Expand Down

0 comments on commit ad7f1ae

Please sign in to comment.