Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-engine-flink-kubernetes] support submit jar artifact on kubernetes through application mode #562

Merged
merged 7 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cn.sliew.scaleph.engine.flink.kubernetes.resource.template.FlinkTemplateSpec;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import org.apache.flink.configuration.ConfigOptions;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -59,11 +60,17 @@ public static FlinkTemplate getDefaults() {

public static FlinkTemplate getSessionClusterDefaults() {
FlinkTemplate defaults = createFlinkTemplateDefaults();
Map<String, String> flinkConfiguration = new HashMap<>(createSessionClusterConfiguration());
flinkConfiguration.putAll(defaults.getSpec().getFlinkConfiguration());
defaults.getSpec().setFlinkConfiguration(flinkConfiguration);
return create("session-cluster", "default", defaults);
}

public static FlinkTemplate getDeploymentDefaults() {
FlinkTemplate defaults = createFlinkTemplateDefaults();
Map<String, String> flinkConfiguration = new HashMap<>(createDeploymentServiceConfiguration());
flinkConfiguration.putAll(defaults.getSpec().getFlinkConfiguration());
defaults.getSpec().setFlinkConfiguration(flinkConfiguration);
return create("deployment", "default", defaults);
}

Expand Down Expand Up @@ -105,7 +112,6 @@ private static Map<String, String> createFlinkConfiguration() {
flinkConfiguration.putAll(createCheckpointConfiguration());
flinkConfiguration.putAll(createPeriodicSavepointConfiguration());
flinkConfiguration.putAll(createRestartConfiguration());
flinkConfiguration.putAll(createServiceConfiguration());
return flinkConfiguration;
}

Expand Down Expand Up @@ -148,10 +154,16 @@ private static Map<String, String> createRestartConfiguration() {
return flinkConfiguration;
}

public static Map<String, String> createServiceConfiguration() {
public static Map<String, String> createDeploymentServiceConfiguration() {
Map<String, String> serviceConfiguration = new HashMap<>();
serviceConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.NODE_PORT.getValue());
serviceConfiguration.put("kubernetes.rest-service.exposed.node-port-address-type", NodePortAddressType.EXTERNAL_IP.getValue());
return serviceConfiguration;
}

public static Map<String, String> createSessionClusterConfiguration() {
Map<String, String> serviceConfiguration = new HashMap<>();
serviceConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.LOAD_BALANCER.getValue());
// serviceConfiguration.put("kubernetes.rest-service.exposed.node-port-address-type", NodePortAddressType.EXTERNAL_IP.getValue());
return serviceConfiguration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import cn.sliew.scaleph.engine.flink.kubernetes.resource.job.FlinkDeploymentJob;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.kubernetes.resource.definition.ResourceCustomizer;
import cn.sliew.scaleph.system.snowflake.utils.NetUtils;
import io.fabric8.kubernetes.api.model.*;

import java.util.*;

public enum FileFetcherFactory implements ResourceCustomizer<WsFlinkKubernetesJobDTO, FlinkDeploymentJob> {
INSTANCE;

private static final String FLINK_MAIN_CONTAINER_NAME = "flink-main-container";
private static final String FILE_FETCHER_CONTAINER_NAME = "scaleph-file-fetcher";
// private static final String FILE_FETCHER_CONTAINER_IMAGE = "ghcr.io/flowerfine/scaleph/scaleph-file-fetcher:latest";
private static final String FILE_FETCHER_CONTAINER_IMAGE = "scaleph-file-fetcher:dev";
Expand All @@ -40,19 +42,51 @@ public enum FileFetcherFactory implements ResourceCustomizer<WsFlinkKubernetesJo

private static final String FILE_FETCHER_VOLUME_NAME = "file-fetcher-volume";
public static final String TARGET_DIRECTORY = "/flink/usrlib/";
public static final String LOCAL_SCHEMA = "local://";
public static final String LOCAL_PATH = LOCAL_SCHEMA + TARGET_DIRECTORY;

@Override
public void customize(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentJob job) {
PodBuilder podBuilder = Optional.ofNullable(job.getSpec().getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
cusomizePodTemplate(podBuilder);
job.getSpec().setPodTemplate(podBuilder.build());

JobManagerSpec jobManager = Optional.ofNullable(job.getSpec().getJobManager()).orElse(new JobManagerSpec());
cusomizeJobManagerPodTemplate(jobDTO, jobManager);
job.getSpec().setJobManager(jobManager);
}

private void cusomizePodTemplate(PodBuilder builder) {
builder.editOrNewMetadata()
.withName("pod-template")
.endMetadata();
PodFluent.SpecNested<PodBuilder> spec = builder.editOrNewSpec();
spec.addToVolumes(buildVolume()); // add volumes
if (spec.hasMatchingContainer(containerBuilder -> containerBuilder.getName().equals(FLINK_MAIN_CONTAINER_NAME))) {
spec.editMatchingContainer((containerBuilder -> containerBuilder.getName().equals(FLINK_MAIN_CONTAINER_NAME)))
.addToVolumeMounts(buildVolumeMount()) // add volume mount
.endContainer();
} else {
spec.addNewContainer()
.withName(FLINK_MAIN_CONTAINER_NAME)
.addToVolumeMounts(buildVolumeMount()) // add volume mount
.endContainer();
}
spec.endSpec();
}

private void cusomizeJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager) {
PodBuilder builder = Optional.of(jobManager).map(JobManagerSpec::getPodTemplate).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
doCustomize(jobDTO, builder);
jobManager.setPodTemplate(builder.build());
job.getSpec().setJobManager(jobManager);
}

private void doCustomize(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
addAdditionalJars(jobDTO, builder);
builder.editOrNewMetadata()
.withName("job-manager-pod-template")
.endMetadata();
addArtifactJar(jobDTO, builder);
addAdditionalJars(jobDTO, builder);
}

private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
Expand All @@ -76,9 +110,7 @@ private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builde
}

private void doAddJars(WsFlinkArtifactJar jarArtifact, PodBuilder builder) {
PodFluent.SpecNested<PodBuilder> spec = builder.editOrNewSpec();
spec.addToInitContainers(addJarArtifact(jarArtifact));
builder.withSpec(spec.endSpec().buildSpec());
builder.editOrNewSpec().addToInitContainers(addJarArtifact(jarArtifact)).endSpec();
}

private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) {
Expand All @@ -87,6 +119,7 @@ private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) {
builder.withImage(FILE_FETCHER_CONTAINER_IMAGE);
builder.withImagePullPolicy(ImagePullPolicy.IF_NOT_PRESENT.getValue());
builder.withArgs(buildFileFetcherArgs(jarArtifact));
builder.withEnv(buildEnvs());
builder.withResources(buildResource());
builder.withVolumeMounts(buildVolumeMount());
builder.withTerminationMessagePath("/dev/termination-log");
Expand All @@ -103,6 +136,14 @@ private List<String> buildFileFetcherArgs(WsFlinkArtifactJar jarArtifact) {
"-path", TARGET_DIRECTORY + jarArtifact.getFileName());
}

private List<EnvVar> buildEnvs() {
EnvVarBuilder builder = new EnvVarBuilder();
builder.withName("MINIO_ENDPOINT");
final String localAddress = NetUtils.getLocalIP();
builder.withValue(String.format("http://%s:9000", localAddress));
return Arrays.asList(builder.build());
}

private ResourceRequirements buildResource() {
ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder();
Map resource = new HashMap();
Expand All @@ -113,11 +154,19 @@ private ResourceRequirements buildResource() {
return resourceRequirementsBuilder.build();
}


private VolumeMount buildVolumeMount() {
VolumeMountBuilder builder = new VolumeMountBuilder();
builder.withName(FILE_FETCHER_VOLUME_NAME);
builder.withMountPath(TARGET_DIRECTORY);
return builder.build();
}

private Volume buildVolume() {
VolumeBuilder builder = new VolumeBuilder();
builder.withName(FILE_FETCHER_VOLUME_NAME);
builder.withEmptyDir(new EmptyDirVolumeSource());
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ public FlinkDeploymentJob convertTo(WsFlinkKubernetesJobDTO source) {
if (source.getFlinkArtifactJar() != null) {
WsFlinkArtifactJar flinkArtifactJar = source.getFlinkArtifactJar();
JobSpec jobSpec = new JobSpec();
jobSpec.setJarURI("local://" + FileFetcherFactory.TARGET_DIRECTORY + flinkArtifactJar.getFileName());
jobSpec.setJarURI(FileFetcherFactory.LOCAL_PATH + flinkArtifactJar.getFileName());
jobSpec.setEntryClass(flinkArtifactJar.getEntryClass());
jobSpec.setArgs(StringUtils.split(flinkArtifactJar.getJarParams(), " "));
spec.setJob(jobSpec);
}
if (source.getFlinkArtifactSql() != null) {
WsFlinkArtifactSql flinkArtifactSql = source.getFlinkArtifactSql();
JobSpec jobSpec = new JobSpec();
jobSpec.setJarURI("local:///sql-runner.jar");
jobSpec.setEntryClass("cn.sliew.engine.flink.sql.SqlRunner");
jobSpec.setJarURI(FileFetcherFactory.LOCAL_PATH + "sql-runner.jar");
jobSpec.setEntryClass("cn.sliew.scaleph.engine.sql.SqlRunner");
List<String> args = Arrays.asList("--script", flinkArtifactSql.getScript());
jobSpec.setArgs(args.toArray(new String[2]));
spec.setJob(jobSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public class FetcherExecutor implements ApplicationRunner {

@Override
public void run(ApplicationArguments args) throws Exception {
log.info("命令行参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs())));
CommandLine line = OptionsParser.parse(args.getSourceArgs(), true);
FetchOptions options = new FetchOptions(line);
Optional<FileFetcher> fileFetcher = fileFetcherFactory.find(options.getUri(), options.getProperties());
fileFetcher.orElseThrow().fetch(options.getUri(), options.getPath());
try {
log.info("命令行参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs())));
CommandLine line = OptionsParser.parse(args.getSourceArgs(), true);
FetchOptions options = new FetchOptions(line);
Optional<FileFetcher> fileFetcher = fileFetcherFactory.find(options.getUri(), options.getProperties());
fileFetcher.orElseThrow().fetch(options.getUri(), options.getPath());
} catch (Exception e) {
log.error("下载文件异常! 参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs())), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,42 @@

import java.io.*;
import java.net.URI;
import java.util.Arrays;
import java.util.List;

@Component
public class InternalFileFetcher implements FileFetcher {

private List<String> SCHEMAS = Arrays.asList("s3a", "oss", "hdfs", "file");

@Autowired
private FileSystemServiceImpl fileSystemService;

@Override
public boolean support(URI uri) {
return uri.getScheme().equals("scaleph");
return SCHEMAS.contains(uri.getScheme());
}

@Override
public void fetch(URI uri, String path) throws IOException {
if (fileSystemService.exists(uri.getPath()) == false) {
if (fileSystemService.exists(uri.toString()) == false) {
throw new FileNotFoundException(uri.getPath());
}
try (InputStream inputStream = fileSystemService.get(uri.getPath());
OutputStream outputStream = FileUtil.getOutputStream(new File(path))) {

try (InputStream inputStream = fileSystemService.get(uri.toString());
OutputStream outputStream = FileUtil.getOutputStream(createFile(path))) {
FileCopyUtils.copy(inputStream, outputStream);
}
}

/**
* fixme this can fix a strange question when flink-main-container mounts jar
* fixme never try to replace it through FileUtil or other utility
* fixme until you have solved such mount problem
*/
private File createFile(String path) throws IOException {
File file = new File(path);
file.createNewFile();
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package cn.sliew.scaleph.system.snowflake.utils;

import cn.sliew.milky.common.exception.Rethrower;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;

/**
Expand Down Expand Up @@ -82,4 +85,21 @@ public static String getLocalAddress() {
return localAddress.getHostAddress();
}

public static String getLocalIP() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
Rethrower.throwAs(e);
return null;
}
}

public static String getLocalHost() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
Rethrower.throwAs(e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {history, useAccess, useIntl} from "umi";
import React, {useRef, useState} from "react";
import {Button, message, Modal, Space, Tag, Tooltip} from "antd";
import {Button, message, Modal, Popconfirm, Space, Tag, Tooltip} from "antd";
import {CaretRightOutlined, CloseOutlined, DeleteOutlined, EditOutlined, NodeIndexOutlined} from "@ant-design/icons";
import {ActionType, ProColumns, ProFormInstance, ProFormSelect, ProTable} from "@ant-design/pro-components";
import {DICT_TYPE, PRIVILEGE_CODE, WORKSPACE_CONF} from "@/constant";
Expand Down Expand Up @@ -79,7 +79,7 @@ const FlinkKubernetesJobWeb: React.FC = () => {
dataIndex: 'artifact',
hideInSearch: true,
render: (dom, entity) => {
return entity.flinkArtifactJar ? entity.flinkArtifactJar.wsFlinkArtifact?.name : (entity.flinkArtifactSql ? entity.flinkArtifactSql?.wsFlinkArtifact?.name: entity.wsDiJob?.wsFlinkArtifact?.name)
return entity.flinkArtifactJar ? entity.flinkArtifactJar.wsFlinkArtifact?.name : (entity.flinkArtifactSql ? entity.flinkArtifactSql?.wsFlinkArtifact?.name : entity.wsDiJob?.wsFlinkArtifact?.name)
},
},
{
Expand Down Expand Up @@ -135,25 +135,38 @@ const FlinkKubernetesJobWeb: React.FC = () => {
)}

{access.canAccess(PRIVILEGE_CODE.datadevJobEdit) && (
<Button
shape="default"
type="link"
icon={<CaretRightOutlined/>}
onClick={() => WsFlinkKubernetesJobService.deploy(record)}
<Popconfirm
title={intl.formatMessage({id: 'app.common.operate.submit.confirm.title'})}
onConfirm={() => {
WsFlinkKubernetesJobService.deploy(record).then(response => {
message.success(intl.formatMessage({id: 'app.common.operate.submit.success'}));
actionRef.current?.reload()
})
}}
>
{intl.formatMessage({id: 'app.common.operate.start.label'})}
</Button>
<Button
shape="default"
type="link"
icon={<CaretRightOutlined/>}
/>
</Popconfirm>
)}

{access.canAccess(PRIVILEGE_CODE.datadevJobEdit) && (
<Button
shape="default"
type="link"
icon={<CloseOutlined/>}
onClick={() => WsFlinkKubernetesJobService.shutdown(record)}
<Popconfirm
title={intl.formatMessage({id: 'app.common.operate.submit.confirm.title'})}
onConfirm={() => {
WsFlinkKubernetesJobService.shutdown(record).then(response => {
message.success(intl.formatMessage({id: 'app.common.operate.submit.success'}));
actionRef.current?.reload()
})
}}
>
{intl.formatMessage({id: 'app.common.operate.stop.label'})}
</Button>
<Button
shape="default"
type="link"
icon={<CloseOutlined/>}
/>
</Popconfirm>
)}

{access.canAccess(PRIVILEGE_CODE.datadevDatasourceDelete) && (
Expand Down
3 changes: 1 addition & 2 deletions tools/docker/build/scaleph-sql-templates/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ RUN cd /scaleph/scaleph-engine/scaleph-sql-templates && \

# Build flink
FROM flink:${FLINK_VERSION}-java${JAVA_VERSION} as image
RUN mkdir -p /opt/flink/usrlib
COPY --from=builder /scaleph/scaleph-engine/scaleph-sql-templates/target/sql-runner.jar /opt/flink/usrlib/
COPY --from=builder /scaleph/scaleph-engine/scaleph-sql-templates/target/sql-runner.jar /flink/usrlib/
Loading