Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-workspace-flinkcdc] update flink-cdc module #741

Merged
merged 1 commit into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.sliew.scaleph.workspace.flink.cdc.service.dto.WsArtifactFlinkCDCDTO;
import cn.sliew.scaleph.workspace.flink.cdc.service.param.*;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.validation.Valid;
Expand Down Expand Up @@ -119,8 +120,8 @@ public ResponseEntity<ResponseVO> deleteArtifact(@PathVariable("artifactId") Lon
@Logging
@PostMapping("preview")
@Operation(summary = "预览 flink cdc 配置", description = "预览 flink cdc 配置")
public ResponseEntity<ResponseVO<String>> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception {
String conf = wsArtifactFlinkCDCService.buildConfig(dto);
public ResponseEntity<ResponseVO<JsonNode>> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception {
JsonNode conf = wsArtifactFlinkCDCService.buildConfig(dto);
return new ResponseEntity<>(ResponseVO.success(conf), HttpStatus.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package cn.sliew.scaleph.plugin.flink.cdc.connectors.doris.sink;

import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.olap.DorisDataSource;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static cn.sliew.scaleph.plugin.flink.cdc.connectors.doris.sink.DorisSinkProperties.*;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.PASSWORD;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.USERNAME;

@AutoService(FlinkCDCPipilineConnectorPlugin.class)
public class DorisSinkPlugin extends FlinkCDCPipilineConnectorPlugin {
Expand All @@ -41,22 +49,33 @@ public DorisSinkPlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
// props.add(FENODES);
// props.add(BENODES);
// props.add(JDBC_URL);
// props.add(USERNAME);
// props.add(PASSWORD);
// props.add(AUTO_REDIRECT);
// props.add(SINK_ENABLE_BATCH_MODE);
// props.add(SINK_FLUSH_QUEUE_SIZE);
// props.add(SINK_BUFFER_FLUSH_MAX_ROWS);
// props.add(SINK_BUFFER_FLUSH_MAX_BYTES);
// props.add(SINK_BUFFER_FLUSH_INTERVAL);
// props.add(SINK_PROPERTIES);
// props.add(TABLE_CREATE_PROPERTIES);
props.add(AUTO_REDIRECT);
props.add(SINK_ENABLE_BATCH_MODE);
props.add(SINK_FLUSH_QUEUE_SIZE);
props.add(SINK_BUFFER_FLUSH_MAX_ROWS);
props.add(SINK_BUFFER_FLUSH_MAX_BYTES);
props.add(SINK_BUFFER_FLUSH_INTERVAL);
props.add(SINK_PROPERTIES);
props.add(TABLE_CREATE_PROPERTIES);
this.supportedProperties = Collections.unmodifiableList(props);
}

@Override
public List<ResourceProperty> getRequiredResources() {
return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE);
}

@Override
public ObjectNode createConf() {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
DorisDataSource dataSource = (DorisDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
conf.putPOJO(FENODES.getName(), dataSource.getNodeUrls());
conf.putPOJO(USERNAME.getName(), dataSource.getUsername());
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
return conf;
}

@Override
protected FlinkCDCPluginMapping getPluginMapping() {
return FlinkCDCPluginMapping.SINK_DORIS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@

package cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source;

import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.jdbc.MySQLDataSource;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -41,11 +50,7 @@ public MySQLSourcePlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
// props.add(HOSTNAME);
// props.add(PORT);
// props.add(USERNAME);
// props.add(PASSWORD);
// props.add(TABLES);
props.add(TABLES);
props.add(SCHEMA_CHANGE_ENABLED);
props.add(SERVER_ID);
props.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
Expand All @@ -66,6 +71,29 @@ public MySQLSourcePlugin() {
this.supportedProperties = Collections.unmodifiableList(props);
}

@Override
public List<ResourceProperty> getRequiredResources() {
return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE);
}

@Override
public ObjectNode createConf() {
try {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
MySQLDataSource dataSource = (MySQLDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
URI url = new URI(dataSource.getUrl().replace("jdbc:", ""));
conf.putPOJO(HOSTNAME.getName(), url.getHost());
conf.putPOJO(PORT.getName(), url.getPort());
conf.putPOJO(USERNAME.getName(), dataSource.getUser());
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
return conf;
} catch (URISyntaxException e) {
Rethrower.throwAs(e);
return null;
}
}

@Override
protected FlinkCDCPluginMapping getPluginMapping() {
return FlinkCDCPluginMapping.SOURCE_MYSQL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@
package cn.sliew.scaleph.plugin.flink.cdc.connectors.starrocks.sink;

import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.olap.StarRocksDataSource;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.PASSWORD;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.USERNAME;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.starrocks.sink.StarRocksSinkProperties.*;

@AutoService(FlinkCDCPipilineConnectorPlugin.class)
Expand All @@ -41,10 +49,6 @@ public StarRocksSinkPlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
props.add(JDBC_URL);
props.add(LOAD_URL);
props.add(USERNAME);
props.add(PASSWORD);
props.add(SINK_LABEL_PREFIX);
props.add(SINK_CONNECT_TIMEOUT_MS);
props.add(SINK_WAIT_FOR_CONTINUE_TIMEOUT_MS);
Expand All @@ -60,6 +64,23 @@ public StarRocksSinkPlugin() {
this.supportedProperties = Collections.unmodifiableList(props);
}

@Override
public List<ResourceProperty> getRequiredResources() {
return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE);
}

@Override
public ObjectNode createConf() {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
StarRocksDataSource dataSource = (StarRocksDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
conf.putPOJO(JDBC_URL.getName(), dataSource.getNodeUrls());
conf.putPOJO(LOAD_URL.getName(), dataSource.getNodeUrls());
conf.putPOJO(USERNAME.getName(), dataSource.getUsername());
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
return conf;
}

@Override
protected FlinkCDCPluginMapping getPluginMapping() {
return FlinkCDCPluginMapping.SINK_STARROCKS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class MybatisPlusGenerator {

private final static String AUTHOR = "wangqi";
private final static String URL = "jdbc:mysql://127.0.0.1:3306/scaleph";
private final static String URL = "jdbc:mysql://127.0.0.1:3306/carp";
private final static String USERNAME = "root";
private final static String PASSWORD = "123456"; //NOSONAR
private static final String BASE_PACKAGE = "cn.sliew";
Expand All @@ -54,7 +54,7 @@ public class MybatisPlusGenerator {
/**
* just add table names here and run the {@link #main(String[])} method.
*/
private static final String[] TABLES = {"dag_instance", "dag_link", "dag_step"};
private static final String[] TABLES = {"sec_application"};

public static void main(String[] args) {
//自动生成配置
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import SinkDorisConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector";
import SinkStarRocksConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector";
import SourceKafkaConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector";
import SinkKafkaConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkKafkaConnector";

type ConnectorProps = {
prefix: string;
type: string;
dsId: number;
};

const FlinkCDCConnectorForm: React.FC<ConnectorProps> = ({type, dsId}) => {
const FlinkCDCConnectorForm: React.FC<ConnectorProps> = ({prefix, type, dsId}) => {

const [content, setConstent] = useState(<></>)

Expand All @@ -27,13 +28,13 @@ const FlinkCDCConnectorForm: React.FC<ConnectorProps> = ({type, dsId}) => {
DsInfoService.selectOne(dsId).then((response) => {
if (response.data) {
if (type === 'source' && response.data.dsType.type.value == 'MySQL') {
setConstent(<SourceMySQLConnectorForm/>)
setConstent(<SourceMySQLConnectorForm prefix={prefix} />)
} else if (type === 'sink' && response.data.dsType.type.value === 'Doris') {
setConstent(<SinkDorisConnectorForm/>)
setConstent(<SinkDorisConnectorForm prefix={prefix} />)
} else if (type === 'sink' && response.data.dsType.type.value === 'StarRocks') {
setConstent(<SinkStarRocksConnectorForm/>)
setConstent(<SinkStarRocksConnectorForm prefix={prefix} />)
} else if (type === 'sink' && response.data.dsType.type.value === 'Kafka') {
setConstent(<SinkStarRocksConnectorForm/>)
setConstent(<SinkKafkaConnectorForm prefix={prefix} />)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ import {useIntl} from "@umijs/max";
import {ProFormDependency, ProFormDigit, ProFormGroup, ProFormSwitch, ProFormText} from "@ant-design/pro-components";
import {DorisParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant";

const SinkDorisConnectorForm: React.FC = () => {
const SinkDorisConnectorForm: React.FC<{prefix: string}> = ({prefix}) => {
const intl = useIntl();

return (
<ProFormGroup>
<ProFormSwitch
name={DorisParams.autoRedirect}
name={[prefix, DorisParams.autoRedirect]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.autoRedirect'})}
/>
<ProFormSwitch
name={DorisParams.sinkEnableBatchModeParam}
name={[prefix, DorisParams.sinkEnableBatchModeParam]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkEnableBatchMode'})}
initialValue={true}
/>
Expand All @@ -22,15 +22,15 @@ const SinkDorisConnectorForm: React.FC = () => {
if (sinkEnableBatchModeParam) {
return <ProFormGroup>
<ProFormDigit
name={DorisParams.sinkFlushQueueSize}
name={[prefix, DorisParams.sinkFlushQueueSize]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkFlushQueueSize'})}
initialValue={2}
fieldProps={{
min: 1
}}
/>
<ProFormDigit
name={DorisParams.sinkBufferFlushMaxRows}
name={[prefix, DorisParams.sinkBufferFlushMaxRows]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkBufferFlushMaxRows'})}
colProps={{span: 8}}
initialValue={50000}
Expand All @@ -39,7 +39,7 @@ const SinkDorisConnectorForm: React.FC = () => {
}}
/>
<ProFormDigit
name={DorisParams.sinkBufferFlushMaxBytes}
name={[prefix, DorisParams.sinkBufferFlushMaxBytes]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkBufferFlushMaxBytes'})}
colProps={{span: 8}}
initialValue={1024 * 1024 * 10}
Expand All @@ -48,7 +48,7 @@ const SinkDorisConnectorForm: React.FC = () => {
}}
/>
<ProFormText
name={DorisParams.sinkBufferFlushInterval}
name={[prefix, DorisParams.sinkBufferFlushInterval]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkBufferFlushInterval'})}
colProps={{span: 8}}
initialValue={'10s'}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,39 @@
import React from 'react';
import {useIntl} from "@umijs/max";
import {ProFormDigit, ProFormGroup, ProFormSelect, ProFormSwitch, ProFormText} from "@ant-design/pro-components";
import {
KafkaParams,
StarRocksParams
} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant";
import {ProFormGroup, ProFormSelect, ProFormSwitch, ProFormText} from "@ant-design/pro-components";
import {KafkaParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant";

const SourceKafkaConnectorForm: React.FC = () => {
const SinkKafkaConnectorForm: React.FC<{prefix: string}> = ({prefix}) => {
const intl = useIntl();

return (
<ProFormGroup>
<ProFormText
name={KafkaParams.propertiesBootstrapServers}
name={[prefix, KafkaParams.propertiesBootstrapServers]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.propertiesBootstrapServers'})}
rules={[{required: true}]}
/>
<ProFormText
name={KafkaParams.topic}
name={[prefix, KafkaParams.topic]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.topic'})}
/>
<ProFormSelect
name={KafkaParams.valueFormat}
name={[prefix, KafkaParams.valueFormat]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.valueFormat'})}
allowClear={false}
initialValue={'debezium-json'}
options={['debezium-json', 'canal-json']}
/>
<ProFormSwitch
name={KafkaParams.sinkAddTableIdToHeaderEnabled}
name={[prefix, KafkaParams.sinkAddTableIdToHeaderEnabled]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.sinkAddTableIdToHeaderEnabled'})}
/>
<ProFormText
name={KafkaParams.sinkCustomHeader}
name={[prefix, KafkaParams.sinkCustomHeader]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.sinkCustomHeader'})}
/>
</ProFormGroup>
);
};

export default SourceKafkaConnectorForm;
export default SinkKafkaConnectorForm;
Loading
Loading