Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-application-flink] update flink kubernetes session cluster #735

Merged
merged 8 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

name: Release-Docker-Flink-CDC
name: Release-Docker-Flink-Kubernetes-Operator

on:
workflow_dispatch:
Expand All @@ -26,10 +26,18 @@ on:
type: choice
options:
- 1.8.0
flinkVersion:
description: 'flink version'
required: true
default: '1.18.1'
type: choice
options:
- 1.18.1

env:
HUB: ghcr.io/flowerfine/flink-kubernetes-operator
FLINK_KUBERNETES_OPERATOR_VERSION: ${{ inputs.flinkKubernetesOperatorVersion }}
FLINK_VERSION: ${{ inputs.flinkVersion }}

jobs:
build:
Expand Down Expand Up @@ -65,6 +73,7 @@ jobs:
context: .
build-args: |
FLINK_KUBERNETES_OPERATOR_VERSION=${{ env.FLINK_KUBERNETES_OPERATOR_VERSION }}
FLINK_VERSION=${{ env.FLINK_VERSION }}
platforms: linux/amd64,linux/arm64
file: tools/docker/build/flink-kubernetes-operator/Dockerfile
tags: ${{ env.HUB }}:${{ env.FLINK_KUBERNETES_OPERATOR_VERSION }}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public ResponseEntity<ResponseVO<WsFlinkKubernetesSessionClusterDTO>> selectOne(
@PostMapping("asYAML")
@Operation(summary = "转换 SessionCluster", description = "转换 SessionCluster")
public ResponseEntity<ResponseVO<FlinkSessionCluster>> asYAML(@RequestBody WsFlinkKubernetesSessionClusterDTO dto) {
FlinkSessionCluster sessionCluster = wsFlinkKubernetesSessionClusterService.asYAML(dto);
FlinkSessionCluster sessionCluster = wsFlinkKubernetesSessionClusterService.asYaml(dto);
return new ResponseEntity(ResponseVO.success(sessionCluster), HttpStatus.OK);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.common.dict.flink.FlinkJobType;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.common.util.SeaTunnelReleaseUtil;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames;
import cn.sliew.scaleph.dao.entity.master.ws.WsArtifactFlinkJar;
import cn.sliew.scaleph.dao.entity.master.ws.WsArtifactSeaTunnel;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.List;

@Component
public class FlinkSessionJobArtifactHandler implements ArtifactHandler {

Expand All @@ -40,6 +46,10 @@ public boolean support(FlinkJobType flinkJobType) {
switch (flinkJobType) {
case JAR:
return true;
case SEATUNNEL:
return true;
case SQL:
return false;
default:
return false;
}
Expand All @@ -51,6 +61,11 @@ public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Object spec)
case JAR:
addJarArtifact(jobInstanceDTO, flinkSessionJobSpec);
break;
case SEATUNNEL:
addSeaTunnelArtifact(jobInstanceDTO, flinkSessionJobSpec);
break;
case SQL:
break;
default:
}
}
Expand All @@ -63,4 +78,14 @@ private void addJarArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Flin
jobSpec.setArgs(StringUtils.split(artifactFlinkJar.getJarParams(), " "));
spec.setJob(jobSpec);
}

private void addSeaTunnelArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkSessionJobSpec spec) {
WsArtifactSeaTunnel artifactSeaTunnel = jobInstanceDTO.getWsFlinkKubernetesJob().getArtifactSeaTunnel();
JobSpec jobSpec = new JobSpec();
jobSpec.setJarURI(SeaTunnelReleaseUtil.seatunnelStarterUrl(SeaTunnelReleaseUtil.STARTER_REPO_URL, artifactSeaTunnel.getSeaTunnelVersion().getValue()));
jobSpec.setEntryClass(SeaTunnelReleaseUtil.SEATUNNEL_MAIN_CLASS);
List<String> args = Arrays.asList("--config", ResourceNames.SEATUNNEL_CONF_FILE_PATH);
jobSpec.setArgs(args.toArray(new String[2]));
spec.setJob(jobSpec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceLabels;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
Expand All @@ -42,11 +43,26 @@ public ObjectMeta handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, ObjectM
return builder.build();
}

public ObjectMeta handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, ObjectMeta objectMeta) {
ObjectMetaBuilder builder = Optional.ofNullable(objectMeta)
.map(meta -> new ObjectMetaBuilder(meta))
.orElse(new ObjectMetaBuilder());

builder.withName(sessionClusterDTO.getSessionClusterId());
addSessionClusterLables(sessionClusterDTO, builder);
return builder.build();
}

private void addJobLables(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, ObjectMetaBuilder builder) {
Map<String, String> lables = generateLables(jobInstanceDTO);
builder.addToLabels(lables);
}

private void addSessionClusterLables(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, ObjectMetaBuilder builder) {
Map<String, String> lables = generateLables(sessionClusterDTO);
builder.addToLabels(lables);
}

public Map<String, String> generateLables(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) {
Map<String, String> labels = new HashMap<>();
WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob();
Expand All @@ -63,4 +79,15 @@ public Map<String, String> generateLables(WsFlinkKubernetesJobInstanceDTO jobIns
return labels;
}

public Map<String, String> generateLables(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO) {
Map<String, String> labels = new HashMap<>();

labels.put(ResourceLabels.SCALEPH_LABEL_PLATFROM, ResourceLabels.SCALEPH);
labels.put(ResourceLabels.SCALEPH_LABEL_NAME, sessionClusterDTO.getName());
labels.put(ResourceLabels.SCALEPH_LABEL_SESSION_CLUSTER_ID, sessionClusterDTO.getSessionClusterId());
labels.put(ResourceLabels.SCALEPH_LABEL_CREATOR, sessionClusterDTO.getCreator());
labels.put(ResourceLabels.SCALEPH_LABEL_EDITOR, sessionClusterDTO.getEditor());
return labels;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.application.flink.resource.definition.sessioncluster;

import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FlinkSessionClusterConverterFactory {

@Autowired
private MetadataHandler metadataHandlerl;
@Autowired
private FlinkSessionClusterSpecHandler flinkSessionClusterSpecHandler;

public FlinkSessionCluster convert(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO) {
FlinkSessionCluster flinkSessionCluster = FlinkSessionClusterConverter.INSTANCE.convertTo(sessionClusterDTO);
flinkSessionCluster.setMetadata(metadataHandlerl.handle(sessionClusterDTO, flinkSessionCluster.getMetadata()));
flinkSessionCluster.setSpec(flinkSessionClusterSpecHandler.handle(sessionClusterDTO, flinkSessionCluster.getSpec()));
return flinkSessionCluster;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.application.flink.resource.definition.sessioncluster;

import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec;
import cn.sliew.scaleph.application.flink.resource.handler.FileSystemPluginHandler;
import cn.sliew.scaleph.application.flink.resource.handler.FlinkMainContainerHandler;
import cn.sliew.scaleph.application.flink.resource.handler.LoggingHandler;
import cn.sliew.scaleph.application.flink.resource.handler.PodTemplateHandler;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class FlinkSessionClusterSpecHandler {

@Autowired
private FileSystemPluginHandler fileSystemPluginHandler;
@Autowired
private LoggingHandler loggingHandler;
@Autowired
private FlinkMainContainerHandler flinkMainContainerHandler;
@Autowired
private PodTemplateHandler podTemplateHandler;

public FlinkSessionClusterSpec handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec flinkSessionClusterSpec) {
FlinkSessionClusterSpec spec = Optional.ofNullable(flinkSessionClusterSpec).orElse(new FlinkSessionClusterSpec());
setPodTemplate(sessionClusterDTO, spec);
enableFileSystem(sessionClusterDTO, spec);
addLogging(sessionClusterDTO, spec);
customFlinkMainContainer(sessionClusterDTO, spec);
return spec;
}

private void setPodTemplate(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
podTemplateHandler.handle(sessionClusterDTO, spec);
}

private void enableFileSystem(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
fileSystemPluginHandler.handle(sessionClusterDTO, spec);
}

private void addLogging(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
loggingHandler.handle(sessionClusterDTO.getLogConfiguration(), spec);
}

private void customFlinkMainContainer(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
flinkMainContainerHandler.handle(sessionClusterDTO, spec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package cn.sliew.scaleph.application.flink.resource.handler;

import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
import cn.sliew.scaleph.common.util.NetUtils;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames;
import cn.sliew.scaleph.config.storage.S3FileSystemProperties;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.PodBuilder;
Expand All @@ -51,20 +53,30 @@ public class FileSystemPluginHandler {

public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) {
PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
handlePodTemplate(jobDTO, podBuilder);
handlePodTemplate(FlinkJobInstanceConverterFactory.getFlinkVersion(jobDTO), podBuilder);
spec.setPodTemplate(podBuilder.build());

Map<String, String> flinkConfiguration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>());
addFileSystemConfigOption(flinkConfiguration);
}

public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
FlinkVersion flinkVersion = FlinkVersion.of(sessionClusterDTO.getKubernetesOptions().getFlinkVersion());
handlePodTemplate(flinkVersion, podBuilder);
spec.setPodTemplate(podBuilder.build());

Map<String, String> flinkConfiguration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>());
addFileSystemConfigOption(flinkConfiguration);
}

private void handlePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
private void handlePodTemplate(FlinkVersion flinkVersion, PodBuilder builder) {
builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME)
.endMetadata();
PodFluent<PodBuilder>.SpecNested<PodBuilder> spec = builder.editOrNewSpec();

ContainerUtil.findFlinkMainContainer(spec)
.addAllToEnv(buildEnableFileSystemEnv(jobDTO))
.addAllToEnv(buildEnableFileSystemEnv(flinkVersion))
.endContainer();

spec.endSpec();
Expand All @@ -80,10 +92,9 @@ void addFileSystemConfigOption(Map<String, String> flinkConfiguration) {
}
}

private List<EnvVar> buildEnableFileSystemEnv(WsFlinkKubernetesJobDTO jobDTO) {
private List<EnvVar> buildEnableFileSystemEnv(FlinkVersion flinkVersion) {
EnvVarBuilder builder = new EnvVarBuilder();
builder.withName(FILE_SYSTEM_ENV_NAME);
FlinkVersion flinkVersion = FlinkJobInstanceConverterFactory.getFlinkVersion(jobDTO);
builder.withValue(String.format(S3_FILE_SYSTEM_TEMPLATE, flinkVersion.getValue()));
return Collections.singletonList(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package cn.sliew.scaleph.application.flink.resource.handler;

import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceAnnotations;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames;
import io.fabric8.kubernetes.api.model.*;
Expand All @@ -39,15 +39,26 @@ public class FlinkMainContainerHandler {

public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
handlePodTemplate(jobInstanceDTO, podBuilder);
podBuilder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME)
.addToAnnotations(buildAnnotations())
.addToLabels(metadataHandler.generateLables(jobInstanceDTO))
.endMetadata();

handlePodTemplate(podBuilder);
spec.setPodTemplate(podBuilder.build());
}

private void handlePodTemplate(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, PodBuilder builder) {
builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME)
public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
podBuilder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME)
.addToAnnotations(buildAnnotations())
.addToLabels(buildLabels(jobInstanceDTO))
.addToLabels(metadataHandler.generateLables(sessionClusterDTO))
.endMetadata();
handlePodTemplate(podBuilder);
spec.setPodTemplate(podBuilder.build());
}

private void handlePodTemplate(PodBuilder builder) {
PodFluent<PodBuilder>.SpecNested<PodBuilder> spec = builder.editOrNewSpec();

ContainerUtil.findFlinkMainContainer(spec)
Expand All @@ -65,10 +76,6 @@ private Map<String, String> buildAnnotations() {
return Collections.emptyMap();
}

private Map<String, String> buildLabels(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) {
return metadataHandler.generateLables(jobInstanceDTO);
}

private List<ContainerPort> buildMetricsPorts() {
List<ContainerPort> ports = new ArrayList<>();
ports.add(new ContainerPortBuilder().withName("jmx-metrics").withContainerPort(8789).withProtocol("TCP").build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public enum FlinkVersionMapping {

V_1_19(OperatorFlinkVersion.v1_19, FlinkVersion.V_1_19_0, FlinkVersion.V_1_19_0),
V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0),
V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_0, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0),
V_1_17(OperatorFlinkVersion.v1_17, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_1, FlinkVersion.V_1_17_0),
V_1_16(OperatorFlinkVersion.v1_16, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_2, FlinkVersion.V_1_16_1, FlinkVersion.V_1_16_0),
V_1_15(OperatorFlinkVersion.v1_15, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_3, FlinkVersion.V_1_15_2, FlinkVersion.V_1_15_1, FlinkVersion.V_1_15_0),
Expand Down
Loading
Loading