Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-ui-react] add flink kubernetes job deploy form #578

Merged
merged 13 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentMode;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
import cn.sliew.scaleph.common.dict.job.*;
import cn.sliew.scaleph.common.dict.seatunnel.*;
Expand Down Expand Up @@ -87,6 +88,7 @@ public enum DictType implements DictDefinition {
FLINK_KUBERNETES_DEPLOYMENT_MODE("deployment_mode", "Deployment 模式", DeploymentMode.class),
FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class),
FLINK_KUBERNETES_RESOURCE_LIFECYCLE_STATE("resource_lifecycle_state", "Deployment 状态", ResourceLifecycleState.class),
FLINK_KUBERNETES_UPGRADE_MODE("upgrade_mode", "Upgrade 方式", UpgradeMode.class),

FLINK_CATALOG_COLUMN_TYPE("flink_catalog_column_type", "Flink Catalog Table Schema 列类型", CatalogColumnType.class),
FLINK_CATALOG_TABLE_KIND("flink_catalog_table_kind", "Flink Catalog Table 类型", CatalogTableKind.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.common.dict.flink.kubernetes;

import cn.sliew.scaleph.common.dict.DictInstance;
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.Arrays;

@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum UpgradeMode implements DictInstance {

STATELESS("stateless", "stateless"),
LAST_STATE("last-state", "last-state"),
SAVEPOINT("savepoint", "savepoint"),
;

@JsonCreator
public static UpgradeMode of(String value) {
return Arrays.stream(values())
.filter(instance -> instance.getValue().equals(value))
.findAny().orElseThrow(() -> new EnumConstantNotPresentException(UpgradeMode.class, value));
}

@EnumValue
private String value;
private String label;

UpgradeMode(String value, String label) {
this.value = value;
this.label = label;
}

@Override
public String getValue() {
return value;
}

@Override
public String getLabel() {
return label;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.dao.entity.master.ws;

import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO {
@TableField("parallelism")
private Integer parallelism;

@TableField("upgrade_mode")
private UpgradeMode upgradeMode;

@TableField("job_manager")
private String jobManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ private void addSeaTunnelArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO
List<String> args = Arrays.asList("--config", ResourceNames.SEATUNNEL_CONF_FILE_PATH);
jobSpec.setArgs(args.toArray(new String[2]));
spec.setJob(jobSpec);
seaTunnelConfHandler.handle(jobInstanceDTO.getWsFlinkKubernetesJob(), spec);
seaTunnelConfHandler.handle(jobInstanceDTO, spec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -36,12 +37,15 @@ public class FlinkDeploymentSpecHandler {
private FileSystemPluginHandler fileSystemPluginHandler;
@Autowired
private FlinkStateStorageHandler flinkStateStorageHandler;
@Autowired
private FlinkJobServiceHandler flinkJobServiceHandler;

public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec flinkDeploymentSpec) {
FlinkDeploymentSpec spec = Optional.ofNullable(flinkDeploymentSpec).orElse(new FlinkDeploymentSpec());
addArtifact(jobInstanceDTO, spec);
enableFileSystem(jobInstanceDTO, spec);
enableFlinkStateStore(jobInstanceDTO, spec);
addService(spec);
return spec;
}

Expand All @@ -56,4 +60,8 @@ private void enableFileSystem(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Fl
private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
flinkStateStorageHandler.handle(jobInstanceDTO.getInstanceId(), spec.getFlinkConfiguration());
}

private void addService(FlinkDeploymentSpec spec) {
flinkJobServiceHandler.handle(spec);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Component
public class FlinkJobServiceHandler {

public void handle(FlinkDeploymentSpec spec) {
Map<String, String> flinkConfiguration = Optional.of(spec.getFlinkConfiguration()).orElse(new HashMap());
Map<String, String> nodePortService = FlinkServiceUtil.addNodePortService();
flinkConfiguration.putAll(nodePortService);
spec.setFlinkConfiguration(flinkConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.common.dict.flink.NodePortAddressType;
import cn.sliew.scaleph.common.dict.flink.ServiceExposedType;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.IngressSpec;

import java.util.HashMap;
import java.util.Map;

public enum FlinkServiceUtil {
;

static Map<String, String> addNodePortService() {
Map<String, String> flinkConfiguration = new HashMap<>();
flinkConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.NODE_PORT.getValue());
flinkConfiguration.put("kubernetes.rest-service.exposed.node-port-address-type", NodePortAddressType.EXTERNAL_IP.getValue());
return flinkConfiguration;
}

static Map<String, String> addLoadBalancerService() {
Map<String, String> flinkConfiguration = new HashMap<>();
flinkConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.LOAD_BALANCER.getValue());
return flinkConfiguration;
}

static IngressSpec createIngressSpec() {
IngressSpec spec = new IngressSpec();
spec.setTemplate("/{{namespace}}/{{name}}(/|$)(.*)");
spec.setClassName("nginx");
Map<String, String> annotations = new HashMap<>();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2");
spec.setAnnotations(annotations);
return spec;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Component
public class FlinkSessioinClusterServiceHandler {

public void handle(FlinkDeploymentSpec spec) {
Map<String, String> flinkConfiguration = Optional.of(spec.getFlinkConfiguration()).orElse(new HashMap());
Map<String, String> loadBalancerService = FlinkServiceUtil.addLoadBalancerService();
flinkConfiguration.putAll(loadBalancerService);
spec.setFlinkConfiguration(flinkConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.config.resource.ResourceNames;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConfigService;
import cn.sliew.scaleph.engine.seatunnel.service.WsDiJobService;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobDTO;
Expand All @@ -42,30 +42,31 @@ public class SeaTunnelConfHandler {
@Autowired
private WsDiJobService wsDiJobService;

public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) {
public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
cusomizePodTemplate(jobDTO, podBuilder);
cusomizePodTemplate(jobInstanceDTO, podBuilder);
spec.setPodTemplate(podBuilder.build());
}

public ConfigMap buildSeaTunnelConf(String instanceId, Long wsDiJobId, ObjectMeta objectMeta) throws Exception {
WsDiJobDTO wsDiJobDTO = wsDiJobService.queryJobGraph(wsDiJobId);
wsDiJobDTO.getWsFlinkArtifact().setName(instanceId);
String prettyJson = seatunnelConfigService.buildConfig(wsDiJobDTO);
String plainJson = JacksonUtil.toJsonNode(prettyJson).toString();

ConfigMapBuilder builder = new ConfigMapBuilder();
builder.withNewMetadataLike(objectMeta)
.withName(instanceId + "-seatunnel-configmap")
.withName(formatSeaTunnelConfigMapName(instanceId))
.endMetadata()
.withData(Map.of(ResourceNames.SEATUNNEL_CONF_FILE, plainJson));
return builder.build();
}

private void cusomizePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
private void cusomizePodTemplate(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, PodBuilder builder) {
builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME)
.endMetadata();
PodFluent.SpecNested<PodBuilder> spec = builder.editOrNewSpec();
spec.addAllToVolumes(buildVolume(jobDTO)); // add volumes
spec.addAllToVolumes(buildVolume(jobInstanceDTO)); // add volumes
if (spec.hasMatchingContainer(containerBuilder -> containerBuilder.getName().equals(ResourceNames.FLINK_MAIN_CONTAINER_NAME))) {
spec.editMatchingContainer((containerBuilder -> containerBuilder.getName().equals(ResourceNames.FLINK_MAIN_CONTAINER_NAME)))
.addAllToVolumeMounts(buildVolumeMount()) // add volume mount
Expand All @@ -87,17 +88,21 @@ private List<VolumeMount> buildVolumeMount() {
return Arrays.asList(seatunnelConf.build());
}

private List<Volume> buildVolume(WsFlinkKubernetesJobDTO jobDTO) {
private List<Volume> buildVolume(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) {
VolumeBuilder seatunnelConf = new VolumeBuilder();
seatunnelConf.withName(ResourceNames.SEATUNNEL_CONF_VOLUME_NAME);
ConfigMapVolumeSourceBuilder configMap = new ConfigMapVolumeSourceBuilder();
KeyToPath keyToPath = new KeyToPathBuilder()
.withKey(ResourceNames.SEATUNNEL_CONF_FILE)
.withPath(ResourceNames.SEATUNNEL_CONF_FILE)
.build();
configMap.withName(jobDTO.getJobId() + "-seatunnel-configmap")
configMap.withName(formatSeaTunnelConfigMapName(jobInstanceDTO.getInstanceId()))
.withItems(keyToPath);
seatunnelConf.withConfigMap(configMap.build());
return Arrays.asList(seatunnelConf.build());
}

private String formatSeaTunnelConfigMapName(String instanceId) {
return String.format("%s-seatunnel-configmap", instanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.engine.flink.kubernetes.service.dto;

import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.dao.entity.BaseDO;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO {
@Schema(description = "parallelism")
private Integer parallelism;

@Schema(description = "upgrade mode")
private UpgradeMode upgradeMode;

@Schema(description = "job manager")
private JobManagerSpec jobManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.engine.flink.kubernetes.service.param;

import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -36,6 +37,9 @@ public class WsFlinkKubernetesJobInstanceDeployParam {
@Schema(description = "parallelism")
private Integer parallelism;

@Schema(description = "upgrade mode")
private UpgradeMode upgradeMode;

@Schema(description = "job manager")
private JobManagerSpec jobManager;

Expand Down
1 change: 1 addition & 0 deletions scaleph-ui-react/src/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const DICT_TYPE = {

deploymentKind: 'deployment_kind',
resourceLifecycleState: 'resource_lifecycle_state',
upgradeMode: 'upgrade_mode',


};
Expand Down
2 changes: 2 additions & 0 deletions scaleph-ui-react/src/locales/zh-CN/pages/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export default {
'app.common.operate.detail.label': '详情',
'app.common.operate.start.label': '启动',
'app.common.operate.stop.label': '停止',
'app.common.operate.deploy.label': 'Deploy',
'app.common.operate.shutdown.label': 'Shutdown',

'app.common.validate.characterWord': '只能输入字母数字、下划线',
'app.common.validate.characterWord2': '只能输入字母数字、下划线和点号',
Expand Down
Loading