From 03219ca88bcfbb3ab6511e1d0879722581f65cff Mon Sep 17 00:00:00 2001 From: wangqi Date: Sat, 13 Jul 2024 21:14:46 +0800 Subject: [PATCH] feature: update flink-cdc form --- .../ws/WsArtifactFlinkCDCController.java | 5 ++- .../doris/sink/DorisSinkPlugin.java | 45 +++++++++++++------ .../mysql/source/MySQLSourcePlugin.java | 38 +++++++++++++--- .../starrocks/sink/StarRocksSinkPlugin.java | 29 ++++++++++-- .../generator/MybatisPlusGenerator.java | 4 +- .../Steps/Connector/ConnectorForm.tsx | 15 ++++--- .../Connector/Sink/SinkDorisConnector.tsx | 14 +++--- .../Connector/Sink/SinkKafkaConnector.tsx | 21 ++++----- .../Connector/Sink/SinkStarRocksConnector.tsx | 22 ++++----- .../Connector/Source/SourceKafkaConnector.tsx | 42 ----------------- .../Connector/Source/SourceMySQLConnector.tsx | 36 +++++++-------- .../Steps/New/Config/ConfigStepDataSource.tsx | 4 +- .../FlinkCDC/Steps/New/index.tsx | 3 +- .../project/WsArtifactFlinkCDCService.ts | 7 +-- .../service/WsArtifactFlinkCDCService.java | 3 +- .../impl/WsArtifactFlinkCDCServiceImpl.java | 24 ++-------- 16 files changed, 162 insertions(+), 150 deletions(-) delete mode 100644 scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector.tsx diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactFlinkCDCController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactFlinkCDCController.java index 1b13429e7..4e50d6b14 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactFlinkCDCController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactFlinkCDCController.java @@ -25,6 +25,7 @@ import cn.sliew.scaleph.workspace.flink.cdc.service.dto.WsArtifactFlinkCDCDTO; import cn.sliew.scaleph.workspace.flink.cdc.service.param.*; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.databind.JsonNode; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.validation.Valid; @@ -119,8 +120,8 @@ public ResponseEntity deleteArtifact(@PathVariable("artifactId") Lon @Logging @PostMapping("preview") @Operation(summary = "预览 flink cdc 配置", description = "预览 flink cdc 配置") - public ResponseEntity> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception { - String conf = wsArtifactFlinkCDCService.buildConfig(dto); + public ResponseEntity> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception { + JsonNode conf = wsArtifactFlinkCDCService.buildConfig(dto); return new ResponseEntity<>(ResponseVO.success(conf), HttpStatus.OK); } } \ No newline at end of file diff --git a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/doris/sink/DorisSinkPlugin.java b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/doris/sink/DorisSinkPlugin.java index a8fbb805e..763e2e470 100644 --- a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/doris/sink/DorisSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/doris/sink/DorisSinkPlugin.java @@ -19,10 +19,16 @@ package cn.sliew.scaleph.plugin.flink.cdc.connectors.doris.sink; import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping; +import cn.sliew.scaleph.ds.modal.AbstractDataSource; +import cn.sliew.scaleph.ds.modal.olap.DorisDataSource; import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin; import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties; import cn.sliew.scaleph.plugin.framework.core.PluginInfo; import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; import java.util.ArrayList; @@ -30,6 +36,8 @@ import java.util.List; import static cn.sliew.scaleph.plugin.flink.cdc.connectors.doris.sink.DorisSinkProperties.*; +import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.PASSWORD; +import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.USERNAME; @AutoService(FlinkCDCPipilineConnectorPlugin.class) public class DorisSinkPlugin extends FlinkCDCPipilineConnectorPlugin { @@ -41,22 +49,33 @@ public DorisSinkPlugin() { final List props = new ArrayList<>(); props.add(CommonProperties.NAME); props.add(CommonProperties.TYPE); -// props.add(FENODES); -// props.add(BENODES); -// props.add(JDBC_URL); -// props.add(USERNAME); -// props.add(PASSWORD); -// props.add(AUTO_REDIRECT); -// props.add(SINK_ENABLE_BATCH_MODE); -// props.add(SINK_FLUSH_QUEUE_SIZE); -// props.add(SINK_BUFFER_FLUSH_MAX_ROWS); -// props.add(SINK_BUFFER_FLUSH_MAX_BYTES); -// props.add(SINK_BUFFER_FLUSH_INTERVAL); -// props.add(SINK_PROPERTIES); -// props.add(TABLE_CREATE_PROPERTIES); + props.add(AUTO_REDIRECT); + props.add(SINK_ENABLE_BATCH_MODE); + props.add(SINK_FLUSH_QUEUE_SIZE); + props.add(SINK_BUFFER_FLUSH_MAX_ROWS); + props.add(SINK_BUFFER_FLUSH_MAX_BYTES); + props.add(SINK_BUFFER_FLUSH_INTERVAL); + props.add(SINK_PROPERTIES); + props.add(TABLE_CREATE_PROPERTIES); this.supportedProperties = Collections.unmodifiableList(props); } + @Override + public List getRequiredResources() { + return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE); + } + + @Override + public ObjectNode createConf() { + ObjectNode conf = super.createConf(); + JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE); + DorisDataSource dataSource = (DorisDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode); + conf.putPOJO(FENODES.getName(), dataSource.getNodeUrls()); + conf.putPOJO(USERNAME.getName(), dataSource.getUsername()); + conf.putPOJO(PASSWORD.getName(), dataSource.getPassword()); + return conf; + } + @Override protected FlinkCDCPluginMapping getPluginMapping() { return FlinkCDCPluginMapping.SINK_DORIS; diff --git a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/mysql/source/MySQLSourcePlugin.java b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/mysql/source/MySQLSourcePlugin.java index 6c8857944..6b24cf025 100644 --- a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/mysql/source/MySQLSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/mysql/source/MySQLSourcePlugin.java @@ -18,13 +18,22 @@ package cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source; +import cn.sliew.milky.common.exception.Rethrower; import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping; +import cn.sliew.scaleph.ds.modal.AbstractDataSource; +import cn.sliew.scaleph.ds.modal.jdbc.MySQLDataSource; import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin; import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties; import cn.sliew.scaleph.plugin.framework.core.PluginInfo; import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,11 +50,7 @@ public MySQLSourcePlugin() { final List props = new ArrayList<>(); props.add(CommonProperties.NAME); props.add(CommonProperties.TYPE); -// props.add(HOSTNAME); -// props.add(PORT); -// props.add(USERNAME); -// props.add(PASSWORD); -// props.add(TABLES); + props.add(TABLES); props.add(SCHEMA_CHANGE_ENABLED); props.add(SERVER_ID); props.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); @@ -66,6 +71,29 @@ public MySQLSourcePlugin() { this.supportedProperties = Collections.unmodifiableList(props); } + @Override + public List getRequiredResources() { + return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE); + } + + @Override + public ObjectNode createConf() { + try { + ObjectNode conf = super.createConf(); + JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE); + MySQLDataSource dataSource = (MySQLDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode); + URI url = new URI(dataSource.getUrl().replace("jdbc:", "")); + conf.putPOJO(HOSTNAME.getName(), url.getHost()); + conf.putPOJO(PORT.getName(), url.getPort()); + conf.putPOJO(USERNAME.getName(), dataSource.getUser()); + conf.putPOJO(PASSWORD.getName(), dataSource.getPassword()); + return conf; + } catch (URISyntaxException e) { + Rethrower.throwAs(e); + return null; + } + } + @Override protected FlinkCDCPluginMapping getPluginMapping() { return FlinkCDCPluginMapping.SOURCE_MYSQL; diff --git a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/starrocks/sink/StarRocksSinkPlugin.java b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/starrocks/sink/StarRocksSinkPlugin.java index cd9dcea4a..86a5d7982 100644 --- a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/starrocks/sink/StarRocksSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/connectors/starrocks/sink/StarRocksSinkPlugin.java @@ -19,16 +19,24 @@ package cn.sliew.scaleph.plugin.flink.cdc.connectors.starrocks.sink; import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping; +import cn.sliew.scaleph.ds.modal.AbstractDataSource; +import cn.sliew.scaleph.ds.modal.olap.StarRocksDataSource; import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin; import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties; import cn.sliew.scaleph.plugin.framework.core.PluginInfo; import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.PASSWORD; +import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.USERNAME; import static cn.sliew.scaleph.plugin.flink.cdc.connectors.starrocks.sink.StarRocksSinkProperties.*; @AutoService(FlinkCDCPipilineConnectorPlugin.class) @@ -41,10 +49,6 @@ public StarRocksSinkPlugin() { final List props = new ArrayList<>(); props.add(CommonProperties.NAME); props.add(CommonProperties.TYPE); - props.add(JDBC_URL); - props.add(LOAD_URL); - props.add(USERNAME); - props.add(PASSWORD); props.add(SINK_LABEL_PREFIX); props.add(SINK_CONNECT_TIMEOUT_MS); props.add(SINK_WAIT_FOR_CONTINUE_TIMEOUT_MS); @@ -60,6 +64,23 @@ public StarRocksSinkPlugin() { this.supportedProperties = Collections.unmodifiableList(props); } + @Override + public List getRequiredResources() { + return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE); + } + + @Override + public ObjectNode createConf() { + ObjectNode conf = super.createConf(); + JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE); + StarRocksDataSource dataSource = (StarRocksDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode); + conf.putPOJO(JDBC_URL.getName(), dataSource.getNodeUrls()); + conf.putPOJO(LOAD_URL.getName(), dataSource.getNodeUrls()); + conf.putPOJO(USERNAME.getName(), dataSource.getUsername()); + conf.putPOJO(PASSWORD.getName(), dataSource.getPassword()); + return conf; + } + @Override protected FlinkCDCPluginMapping getPluginMapping() { return FlinkCDCPluginMapping.SINK_STARROCKS; diff --git a/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java b/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java index 60ddc017a..8899e9ec0 100644 --- a/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java +++ b/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java @@ -44,7 +44,7 @@ public class MybatisPlusGenerator { private final static String AUTHOR = "wangqi"; - private final static String URL = "jdbc:mysql://127.0.0.1:3306/scaleph"; + private final static String URL = "jdbc:mysql://127.0.0.1:3306/carp"; private final static String USERNAME = "root"; private final static String PASSWORD = "123456"; //NOSONAR private static final String BASE_PACKAGE = "cn.sliew"; @@ -54,7 +54,7 @@ public class MybatisPlusGenerator { /** * just add table names here and run the {@link #main(String[])} method. */ - private static final String[] TABLES = {"dag_instance", "dag_link", "dag_step"}; + private static final String[] TABLES = {"sec_application"}; public static void main(String[] args) { //自动生成配置 diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/ConnectorForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/ConnectorForm.tsx index 124d5b182..7b0540d5a 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/ConnectorForm.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/ConnectorForm.tsx @@ -6,15 +6,16 @@ import SinkDorisConnectorForm from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector"; import SinkStarRocksConnectorForm from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector"; -import SourceKafkaConnectorForm - from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector"; +import SinkKafkaConnectorForm + from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkKafkaConnector"; type ConnectorProps = { + prefix: string; type: string; dsId: number; }; -const FlinkCDCConnectorForm: React.FC = ({type, dsId}) => { +const FlinkCDCConnectorForm: React.FC = ({prefix, type, dsId}) => { const [content, setConstent] = useState(<>) @@ -27,13 +28,13 @@ const FlinkCDCConnectorForm: React.FC = ({type, dsId}) => { DsInfoService.selectOne(dsId).then((response) => { if (response.data) { if (type === 'source' && response.data.dsType.type.value == 'MySQL') { - setConstent() + setConstent() } else if (type === 'sink' && response.data.dsType.type.value === 'Doris') { - setConstent() + setConstent() } else if (type === 'sink' && response.data.dsType.type.value === 'StarRocks') { - setConstent() + setConstent() } else if (type === 'sink' && response.data.dsType.type.value === 'Kafka') { - setConstent() + setConstent() } } }) diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector.tsx index d538d1442..15e679b00 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector.tsx @@ -3,17 +3,17 @@ import {useIntl} from "@umijs/max"; import {ProFormDependency, ProFormDigit, ProFormGroup, ProFormSwitch, ProFormText} from "@ant-design/pro-components"; import {DorisParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant"; -const SinkDorisConnectorForm: React.FC = () => { +const SinkDorisConnectorForm: React.FC<{prefix: string}> = ({prefix}) => { const intl = useIntl(); return ( @@ -22,7 +22,7 @@ const SinkDorisConnectorForm: React.FC = () => { if (sinkEnableBatchModeParam) { return { }} /> { }} /> { }} /> { +const SinkKafkaConnectorForm: React.FC<{prefix: string}> = ({prefix}) => { const intl = useIntl(); return ( ); }; -export default SourceKafkaConnectorForm; +export default SinkKafkaConnectorForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector.tsx index a787a20ef..9a3cac06b 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector.tsx @@ -3,22 +3,22 @@ import {useIntl} from "@umijs/max"; import {ProFormDigit, ProFormGroup, ProFormSwitch, ProFormText} from "@ant-design/pro-components"; import {StarRocksParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant"; -const SinkStarRocksConnectorForm: React.FC = () => { +const SinkStarRocksConnectorForm: React.FC<{prefix: string}> = ({prefix}) => { const intl = useIntl(); return ( { }} /> { }} /> { }} /> { }} /> { }} /> { }} /> diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector.tsx deleted file mode 100644 index 5e679a848..000000000 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector.tsx +++ /dev/null @@ -1,42 +0,0 @@ -import React from 'react'; -import {useIntl} from "@umijs/max"; -import {ProFormDigit, ProFormGroup, ProFormSelect, ProFormSwitch, ProFormText} from "@ant-design/pro-components"; -import { - KafkaParams, - StarRocksParams -} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant"; - -const SourceKafkaConnectorForm: React.FC = () => { - const intl = useIntl(); - - return ( - - - - - - - - ); -}; - -export default SourceKafkaConnectorForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceMySQLConnector.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceMySQLConnector.tsx index cf62a6027..d0681b418 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceMySQLConnector.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceMySQLConnector.tsx @@ -10,26 +10,26 @@ import { } from "@ant-design/pro-components"; import {MySQLParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant"; -const SourceMySQLConnectorForm: React.FC = () => { +const SourceMySQLConnectorForm: React.FC<{ prefix: string }> = ({prefix}) => { const intl = useIntl(); return ( { }} /> { }} /> { if (scanStartupMode == 'specific-offset') { return { }} /> @@ -89,7 +89,7 @@ const SourceMySQLConnectorForm: React.FC = () => { { }} /> { defaultCollapsed={true} > { }} /> { /> diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/Config/ConfigStepDataSource.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/Config/ConfigStepDataSource.tsx index 6b18c9a85..1e9b62338 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/Config/ConfigStepDataSource.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/Config/ConfigStepDataSource.tsx @@ -74,7 +74,7 @@ const DataIntegrationFlinkCDCStepConfigDataSource: React.FC = () => { return {intl.formatMessage({id: 'pages.project.di.flink-cdc.step.config.dataSource.fromDsConfig'})} - + }} @@ -84,7 +84,7 @@ const DataIntegrationFlinkCDCStepConfigDataSource: React.FC = () => { return {intl.formatMessage({id: 'pages.project.di.flink-cdc.step.config.dataSource.toDsConfig'})} - + }} diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/index.tsx index 693ba58f5..01947b5ad 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/index.tsx @@ -33,6 +33,7 @@ const DataIntegrationFlinkCDCNewSteps: React.FC = (props: any) => { const onConfigStepFinish = (values: Record) => { try { + console.log('onConfigStepFinish', values) const instance: WsArtifactFlinkCDC = WsArtifactFlinkCDCService.formatData(props.flinkCDCSteps.instance, values) editFlinkCDCConfig(instance) } catch (unused) { @@ -68,7 +69,7 @@ const DataIntegrationFlinkCDCNewSteps: React.FC = (props: any) => { transform: values.transform, route: values.route } - return WsArtifactFlinkCDCService.add(param); + return WsArtifactFlinkCDCService.add(param).then(response => response.success); }} > { - return request>(`${WsArtifactFlinkCDCService.url}/preview`, { + return request>(`${WsArtifactFlinkCDCService.url}/preview`, { method: 'POST', data: param }); @@ -99,7 +98,9 @@ export const WsArtifactFlinkCDCService = { formatData: (data: WsArtifactFlinkCDC, value: Record) => { data.fromDsId = value.fromDsId + data.fromDsConfig = value.fromDsConfig data.toDsId = value.toDsId + data.toDsConfig = value.toDsConfig data.transform = value.transform data.route = value.route return data; diff --git a/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/WsArtifactFlinkCDCService.java b/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/WsArtifactFlinkCDCService.java index 068fdae3c..bf939df6e 100644 --- a/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/WsArtifactFlinkCDCService.java +++ b/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/WsArtifactFlinkCDCService.java @@ -22,6 +22,7 @@ import cn.sliew.scaleph.workspace.flink.cdc.service.dto.WsArtifactFlinkCDCDTO; import cn.sliew.scaleph.workspace.flink.cdc.service.param.*; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.databind.JsonNode; import java.util.List; import java.util.Optional; @@ -40,7 +41,7 @@ public interface WsArtifactFlinkCDCService { WsArtifactFlinkCDCDTO selectCurrent(Long artifactId); - String buildConfig(WsArtifactFlinkCDCDTO dto) throws Exception; + JsonNode buildConfig(WsArtifactFlinkCDCDTO dto) throws Exception; WsArtifactFlinkCDCDTO insert(WsArtifactFlinkCDCAddParam param); diff --git a/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/impl/WsArtifactFlinkCDCServiceImpl.java b/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/impl/WsArtifactFlinkCDCServiceImpl.java index 581da7bb3..1b1ac279e 100644 --- a/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/impl/WsArtifactFlinkCDCServiceImpl.java +++ b/scaleph-workspace/scaleph-workspace-flink-cdc/src/main/java/cn/sliew/scaleph/workspace/flink/cdc/service/impl/WsArtifactFlinkCDCServiceImpl.java @@ -37,8 +37,7 @@ import cn.sliew.scaleph.plugin.flink.cdc.pipeline.PipelineProperties; import cn.sliew.scaleph.plugin.flink.cdc.util.FlinkCDCPluginUtil; import cn.sliew.scaleph.plugin.framework.exception.PluginException; -import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty; -import cn.sliew.scaleph.resource.service.ResourceService; +import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties; import cn.sliew.scaleph.workspace.flink.cdc.service.FlinkCDCConnectorService; import cn.sliew.scaleph.workspace.flink.cdc.service.WsArtifactFlinkCDCService; import cn.sliew.scaleph.workspace.flink.cdc.service.convert.WsArtifactFlinkCDCConvert; @@ -52,7 +51,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import org.apache.commons.lang3.EnumUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -67,9 +65,6 @@ @Service public class WsArtifactFlinkCDCServiceImpl implements WsArtifactFlinkCDCService { - // todo 迁移到 JacksonUtil - private YAMLMapper yamlMapper = new YAMLMapper(); - @Autowired private WsArtifactFlinkCDCMapper wsArtifactFlinkCDCMapper; @Autowired @@ -77,8 +72,6 @@ public class WsArtifactFlinkCDCServiceImpl implements WsArtifactFlinkCDCService @Autowired private FlinkCDCConnectorService flinkCDCConnectorService; @Autowired - private ResourceService resourceService; - @Autowired private DsInfoService dsInfoService; @Override @@ -134,7 +127,7 @@ public WsArtifactFlinkCDCDTO selectCurrent(Long artifactId) { } @Override - public String buildConfig(WsArtifactFlinkCDCDTO dto) throws Exception { + public JsonNode buildConfig(WsArtifactFlinkCDCDTO dto) throws Exception { ObjectNode conf = JacksonUtil.createObjectNode(); conf.set("pipeline", buildPipeline(dto)); if (dto.getFromDsId() != null) { @@ -145,7 +138,7 @@ public String buildConfig(WsArtifactFlinkCDCDTO dto) throws Exception { } conf.set("transform", dto.getTransform()); conf.set("route", dto.getRoute()); - return yamlMapper.writeValueAsString(conf); + return conf; } private ObjectNode buildPipeline(WsArtifactFlinkCDCDTO dto) { @@ -187,16 +180,7 @@ private Properties mergeJobAttrs(FlinkCDCPluginType pluginType, FlinkCDCPluginNa })); properties.put(CommonProperties.TYPE.getName(), pluginName.getValue()); properties.put(CommonProperties.NAME.getName(), dsInfoDTO.getName()); - FlinkCDCPipilineConnectorPlugin connector = flinkCDCConnectorService.getConnector(pluginType, pluginName); - for (ResourceProperty resource : connector.getRequiredResources()) { - String name = resource.getProperty().getName(); - if (properties.containsKey(name)) { - Object property = properties.get(name); - // fixme force conform property to resource id - Object value = resourceService.getRaw(resource.getType(), Long.valueOf(property.toString())); - properties.put(name, JacksonUtil.toJsonString(value)); - } - } + properties.put(ResourceProperties.DATASOURCE.getName(), JacksonUtil.toJsonString(dsInfoDTO)); return properties; }