From fb3204bc96a537dcd655b19b37555e35981ba42b Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 4 Jun 2024 20:56:41 +0800 Subject: [PATCH 1/8] feature: add flink kubernetes operator deploy values --- ...elease-manual-docker-flink-kubernetes-operator.yml | 11 ++++++++++- .../docker/build/flink-kubernetes-operator/Dockerfile | 4 +++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release-manual-docker-flink-kubernetes-operator.yml b/.github/workflows/release-manual-docker-flink-kubernetes-operator.yml index 9a5928db5..764d9df6b 100644 --- a/.github/workflows/release-manual-docker-flink-kubernetes-operator.yml +++ b/.github/workflows/release-manual-docker-flink-kubernetes-operator.yml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -name: Release-Docker-Flink-CDC +name: Release-Docker-Flink-Kubernetes-Operator on: workflow_dispatch: @@ -26,10 +26,18 @@ on: type: choice options: - 1.8.0 + flinkVersion: + description: 'flink version' + required: true + default: '1.18.1' + type: choice + options: + - 1.18.1 env: HUB: ghcr.io/flowerfine/flink-kubernetes-operator FLINK_KUBERNETES_OPERATOR_VERSION: ${{ inputs.flinkKubernetesOperatorVersion }} + FLINK_VERSION: ${{ inputs.flinkVersion }} jobs: build: @@ -65,6 +73,7 @@ jobs: context: . build-args: | FLINK_KUBERNETES_OPERATOR_VERSION=${{ env.FLINK_KUBERNETES_OPERATOR_VERSION }} + FLINK_VERSION=${{ env.FLINK_VERSION }} platforms: linux/amd64,linux/arm64 file: tools/docker/build/flink-kubernetes-operator/Dockerfile tags: ${{ env.HUB }}:${{ env.FLINK_KUBERNETES_OPERATOR_VERSION }} diff --git a/tools/docker/build/flink-kubernetes-operator/Dockerfile b/tools/docker/build/flink-kubernetes-operator/Dockerfile index b431e2de9..c4bf6e7ea 100644 --- a/tools/docker/build/flink-kubernetes-operator/Dockerfile +++ b/tools/docker/build/flink-kubernetes-operator/Dockerfile @@ -17,5 +17,7 @@ ARG FLINK_KUBERNETES_OPERATOR_VERSION=1.8.0 FROM apache/flink-kubernetes-operator:${FLINK_KUBERNETES_OPERATOR_VERSION} as release +ARG FLINK_VERSION=1.18.1 + RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop && \ - wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.1/flink-s3-fs-hadoop-1.18.1.jar -O /opt/flink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop.jar \ No newline at end of file + wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-hadoop/${FLINK_VERSION}/flink-s3-fs-hadoop-${FLINK_VERSION}.jar -O /opt/flink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop.jar \ No newline at end of file From a24e8206850f7f208212bdfcc8f7863d606260b8 Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 4 Jun 2024 21:49:09 +0800 Subject: [PATCH 2/8] feature: add flink kubernetes operator deploy values --- ...inkKubernetesSessionClusterController.java | 2 +- .../job/instance/MetadataHandler.java | 27 ++++++++ .../FlinkSessionClusterConverterFactory.java | 41 +++++++++++ .../FlinkSessionClusterSpecHandler.java | 68 +++++++++++++++++++ .../handler/FileSystemPluginHandler.java | 25 +++++-- .../handler/FlinkMainContainerHandler.java | 27 +++++--- .../resource/handler/FlinkVersionMapping.java | 2 +- .../resource/handler/LoggingHandler.java | 11 +++ .../resource/handler/PodTemplateHandler.java | 7 ++ ...sFlinkKubernetesSessionClusterService.java | 2 +- ...nkKubernetesSessionClusterServiceImpl.java | 26 +++---- tools/kubernetes/flink/values.yaml | 2 +- 12 files changed, 207 insertions(+), 33 deletions(-) create mode 100644 scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterConverterFactory.java create mode 100644 scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterSpecHandler.java diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkKubernetesSessionClusterController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkKubernetesSessionClusterController.java index 3bbcff680..ffb0cd126 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkKubernetesSessionClusterController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkKubernetesSessionClusterController.java @@ -79,7 +79,7 @@ public ResponseEntity> selectOne( @PostMapping("asYAML") @Operation(summary = "转换 SessionCluster", description = "转换 SessionCluster") public ResponseEntity> asYAML(@RequestBody WsFlinkKubernetesSessionClusterDTO dto) { - FlinkSessionCluster sessionCluster = wsFlinkKubernetesSessionClusterService.asYAML(dto); + FlinkSessionCluster sessionCluster = wsFlinkKubernetesSessionClusterService.asYaml(dto); return new ResponseEntity(ResponseVO.success(sessionCluster), HttpStatus.OK); } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/MetadataHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/MetadataHandler.java index edefe55cf..55d3ee47f 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/MetadataHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/MetadataHandler.java @@ -20,6 +20,7 @@ import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO; +import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; import cn.sliew.scaleph.config.kubernetes.resource.ResourceLabels; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; @@ -42,11 +43,26 @@ public ObjectMeta handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, ObjectM return builder.build(); } + public ObjectMeta handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, ObjectMeta objectMeta) { + ObjectMetaBuilder builder = Optional.ofNullable(objectMeta) + .map(meta -> new ObjectMetaBuilder(meta)) + .orElse(new ObjectMetaBuilder()); + + builder.withName(sessionClusterDTO.getSessionClusterId()); + addSessionClusterLables(sessionClusterDTO, builder); + return builder.build(); + } + private void addJobLables(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, ObjectMetaBuilder builder) { Map lables = generateLables(jobInstanceDTO); builder.addToLabels(lables); } + private void addSessionClusterLables(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, ObjectMetaBuilder builder) { + Map lables = generateLables(sessionClusterDTO); + builder.addToLabels(lables); + } + public Map generateLables(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) { Map labels = new HashMap<>(); WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); @@ -63,4 +79,15 @@ public Map generateLables(WsFlinkKubernetesJobInstanceDTO jobIns return labels; } + public Map generateLables(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO) { + Map labels = new HashMap<>(); + + labels.put(ResourceLabels.SCALEPH_LABEL_PLATFROM, ResourceLabels.SCALEPH); + labels.put(ResourceLabels.SCALEPH_LABEL_NAME, sessionClusterDTO.getName()); + labels.put(ResourceLabels.SCALEPH_LABEL_SESSION_CLUSTER_ID, sessionClusterDTO.getSessionClusterId()); + labels.put(ResourceLabels.SCALEPH_LABEL_CREATOR, sessionClusterDTO.getCreator()); + labels.put(ResourceLabels.SCALEPH_LABEL_EDITOR, sessionClusterDTO.getEditor()); + return labels; + } + } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterConverterFactory.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterConverterFactory.java new file mode 100644 index 000000000..29cb68504 --- /dev/null +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterConverterFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.application.flink.resource.definition.sessioncluster; + +import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler; +import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class FlinkSessionClusterConverterFactory { + + @Autowired + private MetadataHandler metadataHandlerl; + @Autowired + private FlinkSessionClusterSpecHandler flinkSessionClusterSpecHandler; + + public FlinkSessionCluster convert(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO) { + FlinkSessionCluster flinkSessionCluster = FlinkSessionClusterConverter.INSTANCE.convertTo(sessionClusterDTO); + flinkSessionCluster.setMetadata(metadataHandlerl.handle(sessionClusterDTO, flinkSessionCluster.getMetadata())); + flinkSessionCluster.setSpec(flinkSessionClusterSpecHandler.handle(sessionClusterDTO, flinkSessionCluster.getSpec())); + return flinkSessionCluster; + } + +} diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterSpecHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterSpecHandler.java new file mode 100644 index 000000000..da424869f --- /dev/null +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/sessioncluster/FlinkSessionClusterSpecHandler.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.application.flink.resource.definition.sessioncluster; + +import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; +import cn.sliew.scaleph.application.flink.resource.handler.FileSystemPluginHandler; +import cn.sliew.scaleph.application.flink.resource.handler.FlinkMainContainerHandler; +import cn.sliew.scaleph.application.flink.resource.handler.LoggingHandler; +import cn.sliew.scaleph.application.flink.resource.handler.PodTemplateHandler; +import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +@Component +public class FlinkSessionClusterSpecHandler { + + @Autowired + private FileSystemPluginHandler fileSystemPluginHandler; + @Autowired + private LoggingHandler loggingHandler; + @Autowired + private FlinkMainContainerHandler flinkMainContainerHandler; + @Autowired + private PodTemplateHandler podTemplateHandler; + + public FlinkSessionClusterSpec handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec flinkSessionClusterSpec) { + FlinkSessionClusterSpec spec = Optional.ofNullable(flinkSessionClusterSpec).orElse(new FlinkSessionClusterSpec()); + setPodTemplate(sessionClusterDTO, spec); + enableFileSystem(sessionClusterDTO, spec); + addLogging(sessionClusterDTO, spec); + customFlinkMainContainer(sessionClusterDTO, spec); + return spec; + } + + private void setPodTemplate(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + podTemplateHandler.handle(sessionClusterDTO, spec); + } + + private void enableFileSystem(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + fileSystemPluginHandler.handle(sessionClusterDTO, spec); + } + + private void addLogging(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + loggingHandler.handle(sessionClusterDTO.getLogConfiguration(), spec); + } + + private void customFlinkMainContainer(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + flinkMainContainerHandler.handle(sessionClusterDTO, spec); + } +} diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java index dd54f139e..83bbeeb42 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java @@ -18,13 +18,15 @@ package cn.sliew.scaleph.application.flink.resource.handler; +import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; +import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; +import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; import cn.sliew.scaleph.common.dict.flink.FlinkVersion; import cn.sliew.scaleph.common.util.NetUtils; import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames; import cn.sliew.scaleph.config.storage.S3FileSystemProperties; -import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; -import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory; import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.PodBuilder; @@ -51,20 +53,30 @@ public class FileSystemPluginHandler { public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) { PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); - handlePodTemplate(jobDTO, podBuilder); + handlePodTemplate(FlinkJobInstanceConverterFactory.getFlinkVersion(jobDTO), podBuilder); + spec.setPodTemplate(podBuilder.build()); + + Map flinkConfiguration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>()); + addFileSystemConfigOption(flinkConfiguration); + } + + public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); + FlinkVersion flinkVersion = FlinkVersion.of(sessionClusterDTO.getKubernetesOptions().getFlinkVersion()); + handlePodTemplate(flinkVersion, podBuilder); spec.setPodTemplate(podBuilder.build()); Map flinkConfiguration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>()); addFileSystemConfigOption(flinkConfiguration); } - private void handlePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private void handlePodTemplate(FlinkVersion flinkVersion, PodBuilder builder) { builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME) .endMetadata(); PodFluent.SpecNested spec = builder.editOrNewSpec(); ContainerUtil.findFlinkMainContainer(spec) - .addAllToEnv(buildEnableFileSystemEnv(jobDTO)) + .addAllToEnv(buildEnableFileSystemEnv(flinkVersion)) .endContainer(); spec.endSpec(); @@ -80,10 +92,9 @@ void addFileSystemConfigOption(Map flinkConfiguration) { } } - private List buildEnableFileSystemEnv(WsFlinkKubernetesJobDTO jobDTO) { + private List buildEnableFileSystemEnv(FlinkVersion flinkVersion) { EnvVarBuilder builder = new EnvVarBuilder(); builder.withName(FILE_SYSTEM_ENV_NAME); - FlinkVersion flinkVersion = FlinkJobInstanceConverterFactory.getFlinkVersion(jobDTO); builder.withValue(String.format(S3_FILE_SYSTEM_TEMPLATE, flinkVersion.getValue())); return Collections.singletonList(builder.build()); } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkMainContainerHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkMainContainerHandler.java index 6a0942af6..cb5db3239 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkMainContainerHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkMainContainerHandler.java @@ -19,10 +19,10 @@ package cn.sliew.scaleph.application.flink.resource.handler; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; -import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory; +import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO; -import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; import cn.sliew.scaleph.config.kubernetes.resource.ResourceAnnotations; import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames; import io.fabric8.kubernetes.api.model.*; @@ -39,15 +39,26 @@ public class FlinkMainContainerHandler { public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); - handlePodTemplate(jobInstanceDTO, podBuilder); + podBuilder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME) + .addToAnnotations(buildAnnotations()) + .addToLabels(metadataHandler.generateLables(jobInstanceDTO)) + .endMetadata(); + + handlePodTemplate(podBuilder); spec.setPodTemplate(podBuilder.build()); } - private void handlePodTemplate(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, PodBuilder builder) { - builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME) + public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); + podBuilder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME) .addToAnnotations(buildAnnotations()) - .addToLabels(buildLabels(jobInstanceDTO)) + .addToLabels(metadataHandler.generateLables(sessionClusterDTO)) .endMetadata(); + handlePodTemplate(podBuilder); + spec.setPodTemplate(podBuilder.build()); + } + + private void handlePodTemplate(PodBuilder builder) { PodFluent.SpecNested spec = builder.editOrNewSpec(); ContainerUtil.findFlinkMainContainer(spec) @@ -65,10 +76,6 @@ private Map buildAnnotations() { return Collections.emptyMap(); } - private Map buildLabels(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) { - return metadataHandler.generateLables(jobInstanceDTO); - } - private List buildMetricsPorts() { List ports = new ArrayList<>(); ports.add(new ContainerPortBuilder().withName("jmx-metrics").withContainerPort(8789).withProtocol("TCP").build()); diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkVersionMapping.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkVersionMapping.java index c181295bd..ac399aad9 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkVersionMapping.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FlinkVersionMapping.java @@ -29,7 +29,7 @@ public enum FlinkVersionMapping { V_1_19(OperatorFlinkVersion.v1_19, FlinkVersion.V_1_19_0, FlinkVersion.V_1_19_0), - V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0), + V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_0, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0), V_1_17(OperatorFlinkVersion.v1_17, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_1, FlinkVersion.V_1_17_0), V_1_16(OperatorFlinkVersion.v1_16, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_2, FlinkVersion.V_1_16_1, FlinkVersion.V_1_16_0), V_1_15(OperatorFlinkVersion.v1_15, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_3, FlinkVersion.V_1_15_2, FlinkVersion.V_1_15_1, FlinkVersion.V_1_15_0), diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java index 105284c00..81b862e06 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java @@ -19,6 +19,7 @@ package cn.sliew.scaleph.application.flink.resource.handler; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -40,4 +41,14 @@ public void handle(Map logConfiguration, FlinkDeploymentSpec spe } } + public void handle(Map logConfiguration, FlinkSessionClusterSpec spec) { + Map configuration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>()); + Map merge = TemplateMerger.merge(configuration, logConfiguration, Map.class); + if (CollectionUtils.isEmpty(merge) == false) { + spec.setLogConfiguration(null); + } else { + spec.setLogConfiguration(merge); + } + } + } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java index 3149628d6..03e48e8fc 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java @@ -19,8 +19,10 @@ package cn.sliew.scaleph.application.flink.resource.handler; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; +import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; import io.fabric8.kubernetes.api.model.Pod; import org.springframework.stereotype.Component; @@ -31,4 +33,9 @@ public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) { Pod merge = TemplateMerger.merge(spec.getPodTemplate(), jobDTO.getFlinkDeployment().getPodTemplate(), Pod.class); spec.setPodTemplate(merge); } + + public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { + Pod merge = TemplateMerger.merge(spec.getPodTemplate(), sessionClusterDTO.getPodTemplate(), Pod.class); + spec.setPodTemplate(merge); + } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/WsFlinkKubernetesSessionClusterService.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/WsFlinkKubernetesSessionClusterService.java index 0fd2f3541..af1a9551b 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/WsFlinkKubernetesSessionClusterService.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/WsFlinkKubernetesSessionClusterService.java @@ -41,7 +41,7 @@ public interface WsFlinkKubernetesSessionClusterService { WsFlinkKubernetesSessionClusterDTO fromTemplate(Long templateId); - FlinkSessionCluster asYAML(WsFlinkKubernetesSessionClusterDTO dto); + FlinkSessionCluster asYaml(WsFlinkKubernetesSessionClusterDTO dto); int insert(WsFlinkKubernetesSessionClusterDTO dto); diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesSessionClusterServiceImpl.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesSessionClusterServiceImpl.java index 12c4504d3..f1f2d65f6 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesSessionClusterServiceImpl.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesSessionClusterServiceImpl.java @@ -20,22 +20,22 @@ import cn.sliew.milky.common.exception.Rethrower; import cn.sliew.milky.common.util.JacksonUtil; -import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesSessionClusterListParam; -import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesSessionClusterSelectListParam; -import cn.sliew.scaleph.common.dict.common.YesOrNo; -import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; -import cn.sliew.scaleph.common.util.UUIDUtil; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesSessionCluster; -import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesSessionClusterMapper; import cn.sliew.scaleph.application.flink.operator.status.FlinkDeploymentStatus; import cn.sliew.scaleph.application.flink.resource.definition.sessioncluster.FlinkSessionCluster; -import cn.sliew.scaleph.application.flink.resource.definition.sessioncluster.FlinkSessionClusterConverter; +import cn.sliew.scaleph.application.flink.resource.definition.sessioncluster.FlinkSessionClusterConverterFactory; import cn.sliew.scaleph.application.flink.service.FlinkKubernetesOperatorService; import cn.sliew.scaleph.application.flink.service.WsFlinkKubernetesSessionClusterService; import cn.sliew.scaleph.application.flink.service.WsFlinkKubernetesTemplateService; import cn.sliew.scaleph.application.flink.service.convert.WsFlinkKubernetesSessionClusterConvert; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesTemplateDTO; +import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesSessionClusterListParam; +import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesSessionClusterSelectListParam; +import cn.sliew.scaleph.common.dict.common.YesOrNo; +import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.util.UUIDUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesSessionCluster; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesSessionClusterMapper; import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -68,6 +68,8 @@ public class WsFlinkKubernetesSessionClusterServiceImpl implements WsFlinkKubern private FlinkKubernetesOperatorService flinkKubernetesOperatorService; @Autowired private WsFlinkSqlGatewayService wsFlinkSqlGatewayService; + @Autowired + private FlinkSessionClusterConverterFactory flinkSessionClusterConverterFactory; /** * Start already exists sql-gateways on app startup. @@ -143,8 +145,8 @@ public WsFlinkKubernetesSessionClusterDTO fromTemplate(Long templateId) { } @Override - public FlinkSessionCluster asYAML(WsFlinkKubernetesSessionClusterDTO dto) { - return FlinkSessionClusterConverter.INSTANCE.convertTo(dto); + public FlinkSessionCluster asYaml(WsFlinkKubernetesSessionClusterDTO dto) { + return flinkSessionClusterConverterFactory.convert(dto); } @Override @@ -261,7 +263,7 @@ public int deleteBatch(List ids) { @Override public void deploy(Long id) throws Exception { WsFlinkKubernetesSessionClusterDTO sessionClusterDTO = selectOne(id); - flinkKubernetesOperatorService.deploySessionCluster(sessionClusterDTO.getClusterCredentialId(), asYAML(sessionClusterDTO)); + flinkKubernetesOperatorService.deploySessionCluster(sessionClusterDTO.getClusterCredentialId(), asYaml(sessionClusterDTO)); WsFlinkKubernetesSessionCluster record = new WsFlinkKubernetesSessionCluster(); record.setId(sessionClusterDTO.getId()); record.setDeployed(YesOrNo.YES); @@ -272,7 +274,7 @@ public void deploy(Long id) throws Exception { public void shutdown(Long id) throws Exception { WsFlinkKubernetesSessionClusterDTO sessionClusterDTO = selectOne(id); if (sessionClusterDTO.getDeployed() == YesOrNo.YES) { - flinkKubernetesOperatorService.shutdownSessionCluster(sessionClusterDTO.getClusterCredentialId(), asYAML(sessionClusterDTO)); + flinkKubernetesOperatorService.shutdownSessionCluster(sessionClusterDTO.getClusterCredentialId(), asYaml(sessionClusterDTO)); } } diff --git a/tools/kubernetes/flink/values.yaml b/tools/kubernetes/flink/values.yaml index 966846863..aa621ed45 100644 --- a/tools/kubernetes/flink/values.yaml +++ b/tools/kubernetes/flink/values.yaml @@ -24,7 +24,7 @@ defaultConfiguration: kubernetes.operator.reconcile.interval: 15 s kubernetes.operator.observer.progress-check.interval: 5 s - s3.endpoint: http://192.168.1.3:9000 + s3.endpoint: http://10.10.18.163:9000 s3.access-key: admin s3.secret-key: password s3.path.style.access: true From 394cf6c832b29214db9ddf40680d52567c82b456 Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 4 Jun 2024 21:50:39 +0800 Subject: [PATCH 3/8] feature: add flink kubernetes operator deploy values --- tools/kubernetes/flink/values-sessioin.yaml | 35 +++++++++++++++++++++ tools/kubernetes/flink/values.yaml | 26 +-------------- 2 files changed, 36 insertions(+), 25 deletions(-) create mode 100644 tools/kubernetes/flink/values-sessioin.yaml diff --git a/tools/kubernetes/flink/values-sessioin.yaml b/tools/kubernetes/flink/values-sessioin.yaml new file mode 100644 index 000000000..ab83c0527 --- /dev/null +++ b/tools/kubernetes/flink/values-sessioin.yaml @@ -0,0 +1,35 @@ + +webhook: + create: false + +image: + repository: ghcr.io/flowerfine/flink-kubernetes-operator + +defaultConfiguration: + # If set to true, creates ConfigMaps/VolumeMounts. If set to false, no configuration will be created. + # All below fields will be ignored if create is set to false. + create: true + # If set to true, + # (1) loads the built-in default configuration + # (2) appends the below flink-conf and logging configuration overrides + # If set to false, loads just the overrides as in (2). + # This option has not effect, if create is equal to false. + append: true + flink-conf.yaml: |+ + # Flink Config Overrides + kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory + kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE + + kubernetes.operator.reconcile.interval: 15 s + kubernetes.operator.observer.progress-check.interval: 5 s + + s3.endpoint: http://${IP}:9000 + s3.access-key: admin + s3.secret-key: password + s3.path.style.access: true + +# set TimeZone from UTC to Asia/Shanghai +operatorPod: + env: + - name: TZ + value: Asia/Shanghai diff --git a/tools/kubernetes/flink/values.yaml b/tools/kubernetes/flink/values.yaml index aa621ed45..522154820 100644 --- a/tools/kubernetes/flink/values.yaml +++ b/tools/kubernetes/flink/values.yaml @@ -3,31 +3,7 @@ webhook: create: false image: - repository: ghcr.io/flowerfine/flink-kubernetes-operator - tag: 1.8.0 - -defaultConfiguration: - # If set to true, creates ConfigMaps/VolumeMounts. If set to false, no configuration will be created. - # All below fields will be ignored if create is set to false. - create: true - # If set to true, - # (1) loads the built-in default configuration - # (2) appends the below flink-conf and logging configuration overrides - # If set to false, loads just the overrides as in (2). - # This option has not effect, if create is equal to false. - append: true - flink-conf.yaml: |+ - # Flink Config Overrides - kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory - kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE - - kubernetes.operator.reconcile.interval: 15 s - kubernetes.operator.observer.progress-check.interval: 5 s - - s3.endpoint: http://10.10.18.163:9000 - s3.access-key: admin - s3.secret-key: password - s3.path.style.access: true + repository: apache/flink-kubernetes-operator # set TimeZone from UTC to Asia/Shanghai operatorPod: From e50b9be349b6c2f4b7b360b6191ad9338f9a48f6 Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 4 Jun 2024 21:54:49 +0800 Subject: [PATCH 4/8] feature: add flink kubernetes operator deploy values --- .../flink/{values-sessioin.yaml => values-session.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tools/kubernetes/flink/{values-sessioin.yaml => values-session.yaml} (100%) diff --git a/tools/kubernetes/flink/values-sessioin.yaml b/tools/kubernetes/flink/values-session.yaml similarity index 100% rename from tools/kubernetes/flink/values-sessioin.yaml rename to tools/kubernetes/flink/values-session.yaml From d65dd077899da15985e240da53bf804c2f66cf6f Mon Sep 17 00:00:00 2001 From: wangqi Date: Fri, 7 Jun 2024 14:57:52 +0800 Subject: [PATCH 5/8] feature: update flink download --- tools/docker/build/flink/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/docker/build/flink/Dockerfile b/tools/docker/build/flink/Dockerfile index 87b6aa63c..4cd2905a8 100644 --- a/tools/docker/build/flink/Dockerfile +++ b/tools/docker/build/flink/Dockerfile @@ -25,7 +25,7 @@ ENV FLINK_HOME /opt/flink # download from flink release page RUN mkdir -p $FLINK_HOME ; cd $FLINK_HOME ; \ - curl -LSO https://archive.apache.org/dist/flink/flink-${FLINK_VERSION}/$TAR_FILE ; \ + curl -LSOo $FLINK_HOME https://archive.apache.org/dist/flink/flink-${FLINK_VERSION}/$TAR_FILE ; \ tar -zxf $TAR_FILE --strip 1 -C . ; \ rm $TAR_FILE From 4821449aa5245d20419116e78119d124572402a5 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sat, 8 Jun 2024 21:00:41 +0800 Subject: [PATCH 6/8] feature: update flink session job handler --- .../FlinkSessionJobArtifactHandler.java | 25 +++++++++++++ .../FlinkJobManagerEndpointServiceImpl.java | 2 +- ...FlinkKubernetesJobInstanceServiceImpl.java | 37 +++++++++++++++---- .../common/util/SeaTunnelReleaseUtil.java | 17 ++++++++- 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkSessionJobArtifactHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkSessionJobArtifactHandler.java index 0820a04c7..769f82ab7 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkSessionJobArtifactHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkSessionJobArtifactHandler.java @@ -23,10 +23,16 @@ import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO; import cn.sliew.scaleph.common.dict.flink.FlinkJobType; import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; +import cn.sliew.scaleph.common.util.SeaTunnelReleaseUtil; +import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames; import cn.sliew.scaleph.dao.entity.master.ws.WsArtifactFlinkJar; +import cn.sliew.scaleph.dao.entity.master.ws.WsArtifactSeaTunnel; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; +import java.util.Arrays; +import java.util.List; + @Component public class FlinkSessionJobArtifactHandler implements ArtifactHandler { @@ -40,6 +46,10 @@ public boolean support(FlinkJobType flinkJobType) { switch (flinkJobType) { case JAR: return true; + case SEATUNNEL: + return true; + case SQL: + return false; default: return false; } @@ -51,6 +61,11 @@ public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Object spec) case JAR: addJarArtifact(jobInstanceDTO, flinkSessionJobSpec); break; + case SEATUNNEL: + addSeaTunnelArtifact(jobInstanceDTO, flinkSessionJobSpec); + break; + case SQL: + break; default: } } @@ -63,4 +78,14 @@ private void addJarArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Flin jobSpec.setArgs(StringUtils.split(artifactFlinkJar.getJarParams(), " ")); spec.setJob(jobSpec); } + + private void addSeaTunnelArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkSessionJobSpec spec) { + WsArtifactSeaTunnel artifactSeaTunnel = jobInstanceDTO.getWsFlinkKubernetesJob().getArtifactSeaTunnel(); + JobSpec jobSpec = new JobSpec(); + jobSpec.setJarURI(SeaTunnelReleaseUtil.seatunnelStarterUrl(SeaTunnelReleaseUtil.STARTER_REPO_URL, artifactSeaTunnel.getSeaTunnelVersion().getValue())); + jobSpec.setEntryClass(SeaTunnelReleaseUtil.SEATUNNEL_MAIN_CLASS); + List args = Arrays.asList("--config", ResourceNames.SEATUNNEL_CONF_FILE_PATH); + jobSpec.setArgs(args.toArray(new String[2])); + spec.setJob(jobSpec); + } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/FlinkJobManagerEndpointServiceImpl.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/FlinkJobManagerEndpointServiceImpl.java index 070428439..03d95790f 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/FlinkJobManagerEndpointServiceImpl.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/FlinkJobManagerEndpointServiceImpl.java @@ -57,7 +57,7 @@ public URI getJobManagerEndpoint(Long jobInstanceId) { switch (jobDTO.getDeploymentKind()) { case FLINK_SESSION_JOB: WsFlinkKubernetesSessionClusterDTO sessionClusterDTO = jobDTO.getFlinkSessionCluster(); - return getJobManagerEndpoint(sessionClusterDTO.getNamespace(), name, sessionClusterDTO.getClusterCredentialId()).orElse(null); + return getSessionClusterJobManagerEndpoint(sessionClusterDTO.getId()); case FLINK_DEPLOYMENT: WsFlinkKubernetesDeploymentDTO deploymentDTO = jobDTO.getFlinkDeployment(); return getJobManagerEndpoint(deploymentDTO.getNamespace(), name, deploymentDTO.getClusterCredentialId()).orElse(null); diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 23b2d5243..33089d32c 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -21,6 +21,7 @@ import cn.sliew.milky.common.exception.Rethrower; import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionJobSpec; import cn.sliew.scaleph.application.flink.operator.spec.JobState; import cn.sliew.scaleph.application.flink.operator.status.*; import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; @@ -216,13 +217,18 @@ public void restart(Long id) throws Exception { Object spec = genericKubernetesResource.get("spec"); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - String json = JacksonUtil.toJsonString(spec); - FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(json, FlinkDeploymentSpec.class); + String deploymentJson = JacksonUtil.toJsonString(spec); + FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(deploymentJson, FlinkDeploymentSpec.class); flinkDeploymentSpec.setRestartNonce(System.currentTimeMillis()); genericKubernetesResource.setAdditionalProperty("spec", flinkDeploymentSpec); flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkDeployment().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; case FLINK_SESSION_JOB: + String sessionJobJson = JacksonUtil.toJsonString(spec); + FlinkSessionJobSpec flinkSessionJobSpec = JacksonUtil.parseJsonString(sessionJobJson, FlinkSessionJobSpec.class); + flinkSessionJobSpec.setRestartNonce(System.currentTimeMillis()); + genericKubernetesResource.setAdditionalProperty("spec", flinkSessionJobSpec); + flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkSessionCluster().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; default: } @@ -239,13 +245,18 @@ public void triggerSavepoint(Long id) throws Exception { Object spec = genericKubernetesResource.get("spec"); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - String json = JacksonUtil.toJsonString(spec); - FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(json, FlinkDeploymentSpec.class); + String deploymentJson = JacksonUtil.toJsonString(spec); + FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(deploymentJson, FlinkDeploymentSpec.class); flinkDeploymentSpec.getJob().setSavepointTriggerNonce(System.currentTimeMillis()); genericKubernetesResource.setAdditionalProperty("spec", flinkDeploymentSpec); flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkDeployment().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; case FLINK_SESSION_JOB: + String sessionJobJson = JacksonUtil.toJsonString(spec); + FlinkSessionJobSpec flinkSessionJobSpec = JacksonUtil.parseJsonString(sessionJobJson, FlinkSessionJobSpec.class); + flinkSessionJobSpec.getJob().setSavepointTriggerNonce(System.currentTimeMillis()); + genericKubernetesResource.setAdditionalProperty("spec", flinkSessionJobSpec); + flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkSessionCluster().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; default: } @@ -262,13 +273,18 @@ public void suspend(Long id) throws Exception { Object spec = genericKubernetesResource.get("spec"); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - String json = JacksonUtil.toJsonString(spec); - FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(json, FlinkDeploymentSpec.class); + String deploymentJson = JacksonUtil.toJsonString(spec); + FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(deploymentJson, FlinkDeploymentSpec.class); flinkDeploymentSpec.getJob().setState(JobState.SUSPENDED); genericKubernetesResource.setAdditionalProperty("spec", flinkDeploymentSpec); flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkDeployment().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; case FLINK_SESSION_JOB: + String sessionJobJson = JacksonUtil.toJsonString(spec); + FlinkSessionJobSpec flinkSessionJobSpec = JacksonUtil.parseJsonString(sessionJobJson, FlinkSessionJobSpec.class); + flinkSessionJobSpec.getJob().setState(JobState.SUSPENDED); + genericKubernetesResource.setAdditionalProperty("spec", flinkSessionJobSpec); + flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkSessionCluster().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; default: } @@ -285,13 +301,18 @@ public void resume(Long id) throws Exception { Object spec = genericKubernetesResource.get("spec"); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - String json = JacksonUtil.toJsonString(spec); - FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(json, FlinkDeploymentSpec.class); + String deploymentJson = JacksonUtil.toJsonString(spec); + FlinkDeploymentSpec flinkDeploymentSpec = JacksonUtil.parseJsonString(deploymentJson, FlinkDeploymentSpec.class); flinkDeploymentSpec.getJob().setState(JobState.RUNNING); genericKubernetesResource.setAdditionalProperty("spec", flinkDeploymentSpec); flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkDeployment().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; case FLINK_SESSION_JOB: + String sessionJobJson = JacksonUtil.toJsonString(spec); + FlinkSessionJobSpec flinkSessionJobSpec = JacksonUtil.parseJsonString(sessionJobJson, FlinkSessionJobSpec.class); + flinkSessionJobSpec.getJob().setState(JobState.RUNNING); + genericKubernetesResource.setAdditionalProperty("spec", flinkSessionJobSpec); + flinkKubernetesOperatorService.applyJob(jobDTO.getFlinkSessionCluster().getClusterCredentialId(), Serialization.asYaml(genericKubernetesResource)); return; default: } diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/SeaTunnelReleaseUtil.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/SeaTunnelReleaseUtil.java index c72ba30f5..d59b13a19 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/SeaTunnelReleaseUtil.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/SeaTunnelReleaseUtil.java @@ -47,7 +47,8 @@ public enum SeaTunnelReleaseUtil { public static final String STARTER_REPO_URL = "https://repo1.maven.org/maven2/org/apache/seatunnel"; - public static final String STARTER_JAR_NAME = "seatunnel-flink-15-starter.jar"; + public static final String STARTER_NAME = "seatunnel-flink-15-starter"; + public static final String STARTER_JAR_NAME = STARTER_NAME + ".jar"; public static final String SEATUNNEL_MAIN_CLASS = "org.apache.seatunnel.core.starter.flink.SeaTunnelFlink"; public static final String SEATUNNEL_PLUGIN_MAPPING = "plugin-mapping.properties"; @@ -111,6 +112,20 @@ public static Path seatunnelConnector(Path rootDir, String connector) { return seatunnelConnectorsRootDir(rootDir).resolve(connector); } + public static String seatunnelStarterUrl(String repoUrl, String version) { + if (StringUtils.endsWithIgnoreCase(repoUrl, "/")) { + repoUrl = StringUtils.removeEndIgnoreCase(repoUrl, "/"); + } + Map variables = Map.of( + "repoUrl", repoUrl, + "version", version, + "connector", STARTER_NAME, + "jar", convertToJar(version, STARTER_NAME)); + StrSubstitutor substitutor = new StrSubstitutor(variables); + String template = "${repoUrl}/${connector}/${version}/${jar}"; + return substitutor.replace(template); + } + public static String seatunnelConnectorUrl(String repoUrl, String version, String connector) { if (StringUtils.endsWithIgnoreCase(repoUrl, "/")) { repoUrl = StringUtils.removeEndIgnoreCase(repoUrl, "/"); From 0757acf5549f426be2f4d89a86794e6b8f0fbb05 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sat, 8 Jun 2024 21:17:01 +0800 Subject: [PATCH 7/8] feature: update flink session job handler --- .../Engine/Compute/Flink/Job/Detail/index.tsx | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx index 7b9601daa..9e3f2f687 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx @@ -141,6 +141,48 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { + { + WsFlinkKubernetesJobService.suspend(props.flinkKubernetesJobDetail.job.jobInstance.id).then(response => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); + } + }) + }} + > + + + + { + WsFlinkKubernetesJobService.resume(props.flinkKubernetesJobDetail.job.jobInstance.id).then(response => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); + } + }) + }} + > + + +