Skip to content

Commit

Permalink
[Feature][scaleph-ui-react] add flink kubernetes job deploy form (#578)
Browse files Browse the repository at this point in the history
* feature: update flink kubernetes job instance service

* feature: add flink service handler

* fix: seatunnel example

* feature: add flink kubernetes job detail model

* feature: add flink kubernetes job detail model

* feature: add flink kubernetes job deploy form

* feature: add flink kubernetes job deploy form

* feature: add flink kubernetes job deploy form
  • Loading branch information
kalencaya authored Jul 21, 2023
1 parent 0026597 commit e6f75bf
Show file tree
Hide file tree
Showing 22 changed files with 570 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentMode;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
import cn.sliew.scaleph.common.dict.job.*;
import cn.sliew.scaleph.common.dict.seatunnel.*;
Expand Down Expand Up @@ -87,6 +88,7 @@ public enum DictType implements DictDefinition {
FLINK_KUBERNETES_DEPLOYMENT_MODE("deployment_mode", "Deployment 模式", DeploymentMode.class),
FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class),
FLINK_KUBERNETES_RESOURCE_LIFECYCLE_STATE("resource_lifecycle_state", "Deployment 状态", ResourceLifecycleState.class),
FLINK_KUBERNETES_UPGRADE_MODE("upgrade_mode", "Upgrade 方式", UpgradeMode.class),

FLINK_CATALOG_COLUMN_TYPE("flink_catalog_column_type", "Flink Catalog Table Schema 列类型", CatalogColumnType.class),
FLINK_CATALOG_TABLE_KIND("flink_catalog_table_kind", "Flink Catalog Table 类型", CatalogTableKind.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.common.dict.flink.kubernetes;

import cn.sliew.scaleph.common.dict.DictInstance;
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.Arrays;

@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum UpgradeMode implements DictInstance {

STATELESS("stateless", "stateless"),
LAST_STATE("last-state", "last-state"),
SAVEPOINT("savepoint", "savepoint"),
;

@JsonCreator
public static UpgradeMode of(String value) {
return Arrays.stream(values())
.filter(instance -> instance.getValue().equals(value))
.findAny().orElseThrow(() -> new EnumConstantNotPresentException(UpgradeMode.class, value));
}

@EnumValue
private String value;
private String label;

UpgradeMode(String value, String label) {
this.value = value;
this.label = label;
}

@Override
public String getValue() {
return value;
}

@Override
public String getLabel() {
return label;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.dao.entity.master.ws;

import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO {
@TableField("parallelism")
private Integer parallelism;

@TableField("upgrade_mode")
private UpgradeMode upgradeMode;

@TableField("job_manager")
private String jobManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ private void addSeaTunnelArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO
List<String> args = Arrays.asList("--config", ResourceNames.SEATUNNEL_CONF_FILE_PATH);
jobSpec.setArgs(args.toArray(new String[2]));
spec.setJob(jobSpec);
seaTunnelConfHandler.handle(jobInstanceDTO.getWsFlinkKubernetesJob(), spec);
seaTunnelConfHandler.handle(jobInstanceDTO, spec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -36,12 +37,15 @@ public class FlinkDeploymentSpecHandler {
private FileSystemPluginHandler fileSystemPluginHandler;
@Autowired
private FlinkStateStorageHandler flinkStateStorageHandler;
@Autowired
private FlinkJobServiceHandler flinkJobServiceHandler;

public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec flinkDeploymentSpec) {
FlinkDeploymentSpec spec = Optional.ofNullable(flinkDeploymentSpec).orElse(new FlinkDeploymentSpec());
addArtifact(jobInstanceDTO, spec);
enableFileSystem(jobInstanceDTO, spec);
enableFlinkStateStore(jobInstanceDTO, spec);
addService(spec);
return spec;
}

Expand All @@ -56,4 +60,8 @@ private void enableFileSystem(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Fl
private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
flinkStateStorageHandler.handle(jobInstanceDTO.getInstanceId(), spec.getFlinkConfiguration());
}

private void addService(FlinkDeploymentSpec spec) {
flinkJobServiceHandler.handle(spec);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Component
public class FlinkJobServiceHandler {

public void handle(FlinkDeploymentSpec spec) {
Map<String, String> flinkConfiguration = Optional.of(spec.getFlinkConfiguration()).orElse(new HashMap());
Map<String, String> nodePortService = FlinkServiceUtil.addNodePortService();
flinkConfiguration.putAll(nodePortService);
spec.setFlinkConfiguration(flinkConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.common.dict.flink.NodePortAddressType;
import cn.sliew.scaleph.common.dict.flink.ServiceExposedType;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.IngressSpec;

import java.util.HashMap;
import java.util.Map;

public enum FlinkServiceUtil {
;

static Map<String, String> addNodePortService() {
Map<String, String> flinkConfiguration = new HashMap<>();
flinkConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.NODE_PORT.getValue());
flinkConfiguration.put("kubernetes.rest-service.exposed.node-port-address-type", NodePortAddressType.EXTERNAL_IP.getValue());
return flinkConfiguration;
}

static Map<String, String> addLoadBalancerService() {
Map<String, String> flinkConfiguration = new HashMap<>();
flinkConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.LOAD_BALANCER.getValue());
return flinkConfiguration;
}

static IngressSpec createIngressSpec() {
IngressSpec spec = new IngressSpec();
spec.setTemplate("/{{namespace}}/{{name}}(/|$)(.*)");
spec.setClassName("nginx");
Map<String, String> annotations = new HashMap<>();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2");
spec.setAnnotations(annotations);
return spec;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Component
public class FlinkSessioinClusterServiceHandler {

public void handle(FlinkDeploymentSpec spec) {
Map<String, String> flinkConfiguration = Optional.of(spec.getFlinkConfiguration()).orElse(new HashMap());
Map<String, String> loadBalancerService = FlinkServiceUtil.addLoadBalancerService();
flinkConfiguration.putAll(loadBalancerService);
spec.setFlinkConfiguration(flinkConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.config.resource.ResourceNames;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConfigService;
import cn.sliew.scaleph.engine.seatunnel.service.WsDiJobService;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobDTO;
Expand All @@ -42,30 +42,31 @@ public class SeaTunnelConfHandler {
@Autowired
private WsDiJobService wsDiJobService;

public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) {
public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
cusomizePodTemplate(jobDTO, podBuilder);
cusomizePodTemplate(jobInstanceDTO, podBuilder);
spec.setPodTemplate(podBuilder.build());
}

public ConfigMap buildSeaTunnelConf(String instanceId, Long wsDiJobId, ObjectMeta objectMeta) throws Exception {
WsDiJobDTO wsDiJobDTO = wsDiJobService.queryJobGraph(wsDiJobId);
wsDiJobDTO.getWsFlinkArtifact().setName(instanceId);
String prettyJson = seatunnelConfigService.buildConfig(wsDiJobDTO);
String plainJson = JacksonUtil.toJsonNode(prettyJson).toString();

ConfigMapBuilder builder = new ConfigMapBuilder();
builder.withNewMetadataLike(objectMeta)
.withName(instanceId + "-seatunnel-configmap")
.withName(formatSeaTunnelConfigMapName(instanceId))
.endMetadata()
.withData(Map.of(ResourceNames.SEATUNNEL_CONF_FILE, plainJson));
return builder.build();
}

private void cusomizePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
private void cusomizePodTemplate(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, PodBuilder builder) {
builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME)
.endMetadata();
PodFluent.SpecNested<PodBuilder> spec = builder.editOrNewSpec();
spec.addAllToVolumes(buildVolume(jobDTO)); // add volumes
spec.addAllToVolumes(buildVolume(jobInstanceDTO)); // add volumes
if (spec.hasMatchingContainer(containerBuilder -> containerBuilder.getName().equals(ResourceNames.FLINK_MAIN_CONTAINER_NAME))) {
spec.editMatchingContainer((containerBuilder -> containerBuilder.getName().equals(ResourceNames.FLINK_MAIN_CONTAINER_NAME)))
.addAllToVolumeMounts(buildVolumeMount()) // add volume mount
Expand All @@ -87,17 +88,21 @@ private List<VolumeMount> buildVolumeMount() {
return Arrays.asList(seatunnelConf.build());
}

private List<Volume> buildVolume(WsFlinkKubernetesJobDTO jobDTO) {
private List<Volume> buildVolume(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) {
VolumeBuilder seatunnelConf = new VolumeBuilder();
seatunnelConf.withName(ResourceNames.SEATUNNEL_CONF_VOLUME_NAME);
ConfigMapVolumeSourceBuilder configMap = new ConfigMapVolumeSourceBuilder();
KeyToPath keyToPath = new KeyToPathBuilder()
.withKey(ResourceNames.SEATUNNEL_CONF_FILE)
.withPath(ResourceNames.SEATUNNEL_CONF_FILE)
.build();
configMap.withName(jobDTO.getJobId() + "-seatunnel-configmap")
configMap.withName(formatSeaTunnelConfigMapName(jobInstanceDTO.getInstanceId()))
.withItems(keyToPath);
seatunnelConf.withConfigMap(configMap.build());
return Arrays.asList(seatunnelConf.build());
}

private String formatSeaTunnelConfigMapName(String instanceId) {
return String.format("%s-seatunnel-configmap", instanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.engine.flink.kubernetes.service.dto;

import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.dao.entity.BaseDO;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO {
@Schema(description = "parallelism")
private Integer parallelism;

@Schema(description = "upgrade mode")
private UpgradeMode upgradeMode;

@Schema(description = "job manager")
private JobManagerSpec jobManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.engine.flink.kubernetes.service.param;

import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -36,6 +37,9 @@ public class WsFlinkKubernetesJobInstanceDeployParam {
@Schema(description = "parallelism")
private Integer parallelism;

@Schema(description = "upgrade mode")
private UpgradeMode upgradeMode;

@Schema(description = "job manager")
private JobManagerSpec jobManager;

Expand Down
1 change: 1 addition & 0 deletions scaleph-ui-react/src/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const DICT_TYPE = {

deploymentKind: 'deployment_kind',
resourceLifecycleState: 'resource_lifecycle_state',
upgradeMode: 'upgrade_mode',


};
Expand Down
2 changes: 2 additions & 0 deletions scaleph-ui-react/src/locales/zh-CN/pages/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export default {
'app.common.operate.detail.label': '详情',
'app.common.operate.start.label': '启动',
'app.common.operate.stop.label': '停止',
'app.common.operate.deploy.label': 'Deploy',
'app.common.operate.shutdown.label': 'Shutdown',

'app.common.validate.characterWord': '只能输入字母数字、下划线',
'app.common.validate.characterWord2': '只能输入字母数字、下划线和点号',
Expand Down
Loading

0 comments on commit e6f75bf

Please sign in to comment.