diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java index e676db99c..58e9aa44f 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java @@ -28,6 +28,7 @@ import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentMode; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; import cn.sliew.scaleph.common.dict.job.*; import cn.sliew.scaleph.common.dict.seatunnel.*; @@ -87,6 +88,7 @@ public enum DictType implements DictDefinition { FLINK_KUBERNETES_DEPLOYMENT_MODE("deployment_mode", "Deployment 模式", DeploymentMode.class), FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class), FLINK_KUBERNETES_RESOURCE_LIFECYCLE_STATE("resource_lifecycle_state", "Deployment 状态", ResourceLifecycleState.class), + FLINK_KUBERNETES_UPGRADE_MODE("upgrade_mode", "Upgrade 方式", UpgradeMode.class), FLINK_CATALOG_COLUMN_TYPE("flink_catalog_column_type", "Flink Catalog Table Schema 列类型", CatalogColumnType.class), FLINK_CATALOG_TABLE_KIND("flink_catalog_table_kind", "Flink Catalog Table 类型", CatalogTableKind.class), diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/UpgradeMode.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/UpgradeMode.java new file mode 100644 index 000000000..4ac139391 --- /dev/null +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/UpgradeMode.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.common.dict.flink.kubernetes; + +import cn.sliew.scaleph.common.dict.DictInstance; +import com.baomidou.mybatisplus.annotation.EnumValue; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.util.Arrays; + +@JsonFormat(shape = JsonFormat.Shape.OBJECT) +public enum UpgradeMode implements DictInstance { + + STATELESS("stateless", "stateless"), + LAST_STATE("last-state", "last-state"), + SAVEPOINT("savepoint", "savepoint"), + ; + + @JsonCreator + public static UpgradeMode of(String value) { + return Arrays.stream(values()) + .filter(instance -> instance.getValue().equals(value)) + .findAny().orElseThrow(() -> new EnumConstantNotPresentException(UpgradeMode.class, value)); + } + + @EnumValue + private String value; + private String label; + + UpgradeMode(String value, String label) { + this.value = value; + this.label = label; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getLabel() { + return label; + } +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java index 3accdaa4d..40f8450c4 100644 --- a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java @@ -19,6 +19,7 @@ package cn.sliew.scaleph.dao.entity.master.ws; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.dao.entity.BaseDO; import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.TableField; @@ -50,6 +51,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO { @TableField("parallelism") private Integer parallelism; + @TableField("upgrade_mode") + private UpgradeMode upgradeMode; + @TableField("job_manager") private String jobManager; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentArtifactHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentArtifactHandler.java index c47da4ce3..f198c3153 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentArtifactHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentArtifactHandler.java @@ -73,6 +73,6 @@ private void addSeaTunnelArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO List args = Arrays.asList("--config", ResourceNames.SEATUNNEL_CONF_FILE_PATH); jobSpec.setArgs(args.toArray(new String[2])); spec.setJob(jobSpec); - seaTunnelConfHandler.handle(jobInstanceDTO.getWsFlinkKubernetesJob(), spec); + seaTunnelConfHandler.handle(jobInstanceDTO, spec); } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java index 5285880d3..c5768413f 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java @@ -20,6 +20,7 @@ import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; import org.springframework.beans.factory.annotation.Autowired; @@ -36,12 +37,15 @@ public class FlinkDeploymentSpecHandler { private FileSystemPluginHandler fileSystemPluginHandler; @Autowired private FlinkStateStorageHandler flinkStateStorageHandler; + @Autowired + private FlinkJobServiceHandler flinkJobServiceHandler; public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec flinkDeploymentSpec) { FlinkDeploymentSpec spec = Optional.ofNullable(flinkDeploymentSpec).orElse(new FlinkDeploymentSpec()); addArtifact(jobInstanceDTO, spec); enableFileSystem(jobInstanceDTO, spec); enableFlinkStateStore(jobInstanceDTO, spec); + addService(spec); return spec; } @@ -56,4 +60,8 @@ private void enableFileSystem(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Fl private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { flinkStateStorageHandler.handle(jobInstanceDTO.getInstanceId(), spec.getFlinkConfiguration()); } + + private void addService(FlinkDeploymentSpec spec) { + flinkJobServiceHandler.handle(spec); + } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkJobServiceHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkJobServiceHandler.java new file mode 100644 index 000000000..31dee7520 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkJobServiceHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; + +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Component +public class FlinkJobServiceHandler { + + public void handle(FlinkDeploymentSpec spec) { + Map flinkConfiguration = Optional.of(spec.getFlinkConfiguration()).orElse(new HashMap()); + Map nodePortService = FlinkServiceUtil.addNodePortService(); + flinkConfiguration.putAll(nodePortService); + spec.setFlinkConfiguration(flinkConfiguration); + } +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkServiceUtil.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkServiceUtil.java new file mode 100644 index 000000000..93f8cf0b0 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkServiceUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; + +import cn.sliew.scaleph.common.dict.flink.NodePortAddressType; +import cn.sliew.scaleph.common.dict.flink.ServiceExposedType; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.IngressSpec; + +import java.util.HashMap; +import java.util.Map; + +public enum FlinkServiceUtil { + ; + + static Map addNodePortService() { + Map flinkConfiguration = new HashMap<>(); + flinkConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.NODE_PORT.getValue()); + flinkConfiguration.put("kubernetes.rest-service.exposed.node-port-address-type", NodePortAddressType.EXTERNAL_IP.getValue()); + return flinkConfiguration; + } + + static Map addLoadBalancerService() { + Map flinkConfiguration = new HashMap<>(); + flinkConfiguration.put("kubernetes.rest-service.exposed.type", ServiceExposedType.LOAD_BALANCER.getValue()); + return flinkConfiguration; + } + + static IngressSpec createIngressSpec() { + IngressSpec spec = new IngressSpec(); + spec.setTemplate("/{{namespace}}/{{name}}(/|$)(.*)"); + spec.setClassName("nginx"); + Map annotations = new HashMap<>(); + annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2"); + spec.setAnnotations(annotations); + return spec; + } + +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkSessioinClusterServiceHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkSessioinClusterServiceHandler.java new file mode 100644 index 000000000..2bf033b3a --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkSessioinClusterServiceHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; + +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Component +public class FlinkSessioinClusterServiceHandler { + + public void handle(FlinkDeploymentSpec spec) { + Map flinkConfiguration = Optional.of(spec.getFlinkConfiguration()).orElse(new HashMap()); + Map loadBalancerService = FlinkServiceUtil.addLoadBalancerService(); + flinkConfiguration.putAll(loadBalancerService); + spec.setFlinkConfiguration(flinkConfiguration); + } +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java index 2449429a3..5d41eb6c9 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java @@ -21,7 +21,7 @@ import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.config.resource.ResourceNames; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; -import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; +import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConfigService; import cn.sliew.scaleph.engine.seatunnel.service.WsDiJobService; import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobDTO; @@ -42,30 +42,31 @@ public class SeaTunnelConfHandler { @Autowired private WsDiJobService wsDiJobService; - public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) { + public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); - cusomizePodTemplate(jobDTO, podBuilder); + cusomizePodTemplate(jobInstanceDTO, podBuilder); spec.setPodTemplate(podBuilder.build()); } public ConfigMap buildSeaTunnelConf(String instanceId, Long wsDiJobId, ObjectMeta objectMeta) throws Exception { WsDiJobDTO wsDiJobDTO = wsDiJobService.queryJobGraph(wsDiJobId); + wsDiJobDTO.getWsFlinkArtifact().setName(instanceId); String prettyJson = seatunnelConfigService.buildConfig(wsDiJobDTO); String plainJson = JacksonUtil.toJsonNode(prettyJson).toString(); ConfigMapBuilder builder = new ConfigMapBuilder(); builder.withNewMetadataLike(objectMeta) - .withName(instanceId + "-seatunnel-configmap") + .withName(formatSeaTunnelConfigMapName(instanceId)) .endMetadata() .withData(Map.of(ResourceNames.SEATUNNEL_CONF_FILE, plainJson)); return builder.build(); } - private void cusomizePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private void cusomizePodTemplate(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, PodBuilder builder) { builder.editOrNewMetadata().withName(ResourceNames.POD_TEMPLATE_NAME) .endMetadata(); PodFluent.SpecNested spec = builder.editOrNewSpec(); - spec.addAllToVolumes(buildVolume(jobDTO)); // add volumes + spec.addAllToVolumes(buildVolume(jobInstanceDTO)); // add volumes if (spec.hasMatchingContainer(containerBuilder -> containerBuilder.getName().equals(ResourceNames.FLINK_MAIN_CONTAINER_NAME))) { spec.editMatchingContainer((containerBuilder -> containerBuilder.getName().equals(ResourceNames.FLINK_MAIN_CONTAINER_NAME))) .addAllToVolumeMounts(buildVolumeMount()) // add volume mount @@ -87,7 +88,7 @@ private List buildVolumeMount() { return Arrays.asList(seatunnelConf.build()); } - private List buildVolume(WsFlinkKubernetesJobDTO jobDTO) { + private List buildVolume(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) { VolumeBuilder seatunnelConf = new VolumeBuilder(); seatunnelConf.withName(ResourceNames.SEATUNNEL_CONF_VOLUME_NAME); ConfigMapVolumeSourceBuilder configMap = new ConfigMapVolumeSourceBuilder(); @@ -95,9 +96,13 @@ private List buildVolume(WsFlinkKubernetesJobDTO jobDTO) { .withKey(ResourceNames.SEATUNNEL_CONF_FILE) .withPath(ResourceNames.SEATUNNEL_CONF_FILE) .build(); - configMap.withName(jobDTO.getJobId() + "-seatunnel-configmap") + configMap.withName(formatSeaTunnelConfigMapName(jobInstanceDTO.getInstanceId())) .withItems(keyToPath); seatunnelConf.withConfigMap(configMap.build()); return Arrays.asList(seatunnelConf.build()); } + + private String formatSeaTunnelConfigMapName(String instanceId) { + return String.format("%s-seatunnel-configmap", instanceId); + } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java index 84665aa51..ccdfd782f 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java @@ -19,6 +19,7 @@ package cn.sliew.scaleph.engine.flink.kubernetes.service.dto; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.dao.entity.BaseDO; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec; @@ -52,6 +53,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO { @Schema(description = "parallelism") private Integer parallelism; + @Schema(description = "upgrade mode") + private UpgradeMode upgradeMode; + @Schema(description = "job manager") private JobManagerSpec jobManager; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java index 9567bd5be..4b8582e45 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java @@ -18,6 +18,7 @@ package cn.sliew.scaleph.engine.flink.kubernetes.service.param; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec; import io.swagger.v3.oas.annotations.media.Schema; @@ -36,6 +37,9 @@ public class WsFlinkKubernetesJobInstanceDeployParam { @Schema(description = "parallelism") private Integer parallelism; + @Schema(description = "upgrade mode") + private UpgradeMode upgradeMode; + @Schema(description = "job manager") private JobManagerSpec jobManager; diff --git a/scaleph-ui-react/src/constant.ts b/scaleph-ui-react/src/constant.ts index 63ff4d45c..1d18bc95b 100644 --- a/scaleph-ui-react/src/constant.ts +++ b/scaleph-ui-react/src/constant.ts @@ -51,6 +51,7 @@ export const DICT_TYPE = { deploymentKind: 'deployment_kind', resourceLifecycleState: 'resource_lifecycle_state', + upgradeMode: 'upgrade_mode', }; diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/base.ts b/scaleph-ui-react/src/locales/zh-CN/pages/base.ts index 51be6ce79..2ff2fae0e 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/base.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/base.ts @@ -58,6 +58,8 @@ export default { 'app.common.operate.detail.label': '详情', 'app.common.operate.start.label': '启动', 'app.common.operate.stop.label': '停止', + 'app.common.operate.deploy.label': 'Deploy', + 'app.common.operate.shutdown.label': 'Shutdown', 'app.common.validate.characterWord': '只能输入字母数字、下划线', 'app.common.validate.characterWord2': '只能输入字母数字、下划线和点号', diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index 8f3c7da81..71fcd1c8c 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -794,6 +794,18 @@ export default { 'pages.project.flink.kubernetes.job.detail': '详情', 'pages.project.flink.kubernetes.job.detail.deploy': 'Deploy', + 'pages.project.flink.kubernetes.job.detail.deploy.resource': 'Resource', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.jobManagerCpu': 'JobManager CPU', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.jobManagerMemory': 'JobManager Memory', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.jobManagerReplicas': 'JobManager Replicas', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.taskManagerCpu': 'TaskManager CPU', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.taskManagerMemory': 'TaskManager Memory', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.taskManagerReplicas': 'TaskManager Replicas', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.parallelism': 'Parallelism', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.userFlinkConfiguration': 'Flink配置', + 'pages.project.flink.kubernetes.job.detail.deploy.state': 'State', + 'pages.project.flink.kubernetes.job.detail.deploy.state.upgradeMode': 'Upgrade 方式', + 'pages.project.flink.kubernetes.job.detail.suspend': 'Suspend', 'pages.project.flink.kubernetes.job.detail.shutdown': 'Shutdown', 'pages.project.flink.kubernetes.job.detail.savepoint': 'Savepoint', @@ -801,6 +813,8 @@ export default { 'pages.project.flink.kubernetes.job.detail.metrics': 'Metrics', 'pages.project.flink.kubernetes.job.detail.logs': 'Logs', + + 'pages.project.flink.kubernetes.job.detail.yaml': 'YAML', }; diff --git a/scaleph-ui-react/src/models/jobDetail.ts b/scaleph-ui-react/src/models/jobDetail.ts new file mode 100644 index 000000000..7a1511aa7 --- /dev/null +++ b/scaleph-ui-react/src/models/jobDetail.ts @@ -0,0 +1,45 @@ +import {WsFlinkKubernetesJob} from "@/services/project/typings"; +import {Effect, Reducer} from "umi"; +import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; + +export interface StateType { + job: WsFlinkKubernetesJob, +} + +export interface ModelType { + namespace: string; + + state: StateType; + + effects: { + queryJob: Effect; + }; + + reducers: { + updateJob: Reducer; + }; +} + +const model: ModelType = { + state: { + job: null + }, + + effects: { + *queryJob({payload}, {call, put}) { + const {data} = yield call(WsFlinkKubernetesJobService.selectOne, payload); + yield put({type: 'updateJob', payload: {job: data}}); + }, + }, + + reducers: { + updateJob(state, {payload}) { + return { + ...state, + job: payload.job + }; + }, + }, +}; + +export default model; diff --git a/scaleph-ui-react/src/models/sessionClusterStep2.ts b/scaleph-ui-react/src/models/sessionClusterStep2.ts deleted file mode 100644 index 8701a33df..000000000 --- a/scaleph-ui-react/src/models/sessionClusterStep2.ts +++ /dev/null @@ -1,58 +0,0 @@ -import {WsFlinkKubernetesSessionCluster} from "@/services/project/typings"; -import {Effect, Reducer} from "umi"; -import {WsFlinkKubernetesSessionClusterService} from "@/services/project/WsFlinkKubernetesSessionClusterService"; - -export interface StateType { - templateId: number, - sessionCluster: WsFlinkKubernetesSessionCluster, -} - -export interface ModelType { - namespace: string; - - state: StateType; - - effects: { - queryTemplate: Effect; - }; - - reducers: { - updateTemplate: Reducer; - }; -} - -const model: ModelType = { - state: { - templateId: null, - sessionCluster: null, - }, - - effects: { - *queryTemplate({payload}, {call, put}) { - const {data} = yield call(WsFlinkKubernetesSessionClusterService.fromTemplate, payload); - yield put({type: 'updateSessionCluster', payload: {templateId: payload, sessionCluster: data}}); - }, - - *editSessionCluster({payload}, {call, put}) { - yield put({type: 'updateSessionClusterOnly', payload: {sessionCluster: payload}}); - }, - }, - - reducers: { - updateSessionCluster(state, {payload}) { - return { - ...state, - templateId: payload.templateId, - sessionCluster: payload.sessionCluster - }; - }, - updateSessionClusterOnly(state, {payload}) { - return { - ...state, - sessionCluster: payload.sessionCluster - }; - }, - }, -}; - -export default model; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm.tsx new file mode 100644 index 000000000..f6f56b1f2 --- /dev/null +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm.tsx @@ -0,0 +1,83 @@ +import {useIntl} from "umi"; +import React from "react"; +import {ProFormDigit, ProFormGroup, ProFormText, ProFormTextArea} from "@ant-design/pro-components"; + +const FlinkKubernetesJobDeployResourceStepForm: React.FC = () => { + const intl = useIntl(); + + return ( + + + + + + + + + + + ); +} + +export default FlinkKubernetesJobDeployResourceStepForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx new file mode 100644 index 000000000..9ce24d922 --- /dev/null +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx @@ -0,0 +1,25 @@ +import {useIntl} from "umi"; +import React from "react"; +import {ProFormGroup, ProFormRadio} from "@ant-design/pro-components"; +import {DictDataService} from "@/services/admin/dictData.service"; +import {DICT_TYPE} from "@/constant"; + +const FlinkKubernetesJobDeployStateStepForm: React.FC = () => { + const intl = useIntl(); + + return ( + + { + return DictDataService.listDictDataByType2(DICT_TYPE.upgradeMode) + }} + /> + + + ); +} + +export default FlinkKubernetesJobDeployStateStepForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx new file mode 100644 index 000000000..ff074aad7 --- /dev/null +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx @@ -0,0 +1,88 @@ +import {useIntl} from "umi"; +import React, {useRef} from "react"; +import {message, Modal} from "antd"; +import {ProCard, ProFormInstance, StepsForm} from "@ant-design/pro-components"; +import {ModalFormProps} from '@/app.d'; +import {WsFlinkKubernetesJob, WsFlinkKubernetesJobInstanceDeployParam} from "@/services/project/typings"; +import FlinkKubernetesJobDeployResourceStepForm + from "@/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm"; +import FlinkKubernetesJobDeployStateStepForm from "@/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm"; +import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; + +const FlinkKubernetesJobDeployForm: React.FC> = ({ + data, + visible, + onVisibleChange, + onCancel + }) => { + const intl = useIntl(); + const formRef = useRef(); + + return ( + + + ) => { + const jobManagerSpec = { + resource: { + cpu: values.jobManagerCpu, + memory: values.jobManagerMemory, + }, + replicas: values.jobManagerReplicas + } + const taskManagerSpec = { + resource: { + cpu: values.taskManagerCpu, + memory: values.taskManagerMemory, + }, + replicas: values.taskManagerReplicas + } + const param: WsFlinkKubernetesJobInstanceDeployParam = { + wsFlinkKubernetesJobId: data.id, + jobManager: jobManagerSpec, + taskManager: taskManagerSpec, + parallelism: values.parallelism, + upgradeMode: values.upgradeMode, + userFlinkConfiguration: values.userFlinkConfiguration + } + return WsFlinkKubernetesJobService.deploy(param).then((response) => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); + onVisibleChange(false); + } + }) + }} + > + + + + + + + + + + + ); +} + +export default FlinkKubernetesJobDeployForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/YAML/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/YAML/index.tsx index e17368c1d..763526173 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/YAML/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/YAML/index.tsx @@ -1,12 +1,11 @@ import React, {useEffect, useRef, useState} from "react"; import Editor, {Monaco, useMonaco} from "@monaco-editor/react"; -import YAML from "yaml"; import {Props} from '@/app.d'; import {WsFlinkKubernetesJob} from "@/services/project/typings"; import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; +import {connect} from "umi"; - -const FlinkKubernetesJobDetailYAMLWeb: React.FC> = ({data}) => { +const FlinkKubernetesJobDetailYAMLWeb: React.FC> = (props: any) => { const editorRef = useRef(null); const monaco = useMonaco(); @@ -19,10 +18,14 @@ const FlinkKubernetesJobDetailYAMLWeb: React.FC> = ( }, [monaco]); useEffect(() => { - WsFlinkKubernetesJobService.asYaml(data.id).then((response) => { - setJob(response.data) - }) - }, []); + if (props.jobDetail.job) { + WsFlinkKubernetesJobService.asYaml(props.jobDetail.job?.id).then((response) => { + if (response.success) { + setJob(response.data) + } + }) + } + }, [props.jobDetail.job]); const handleEditorDidMount = (editor, monaco: Monaco) => { editorRef.current = editor; @@ -47,4 +50,6 @@ const FlinkKubernetesJobDetailYAMLWeb: React.FC> = ( ); } -export default FlinkKubernetesJobDetailYAMLWeb; + +const mapModelToProps = ({jobDetail}: any) => ({jobDetail}) +export default connect(mapModelToProps)(FlinkKubernetesJobDetailYAMLWeb); diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx index 1b288a87d..5e0b42459 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx @@ -1,12 +1,11 @@ -import {useAccess, useIntl, useLocation} from "umi"; -import React from "react"; +import {connect, useAccess, useIntl, useLocation} from "umi"; +import React, {useEffect, useState} from "react"; import {Button, message, Popconfirm, Tabs} from "antd"; import FlinkKubernetesJobDetailYAMLWeb from "@/pages/Project/Workspace/Kubernetes/Job/Detail/YAML"; import {ProDescriptionsItemProps} from "@ant-design/pro-descriptions"; import {WsFlinkKubernetesJob} from "@/services/project/typings"; import {PageContainer, ProDescriptions} from "@ant-design/pro-components"; import {ProCoreActionType} from "@ant-design/pro-utils/es/typing"; -import {WsFlinkKubernetesSessionClusterService} from "@/services/project/WsFlinkKubernetesSessionClusterService"; import { AreaChartOutlined, CameraOutlined, @@ -17,11 +16,35 @@ import { PauseOutlined } from "@ant-design/icons"; import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; +import FlinkKubernetesJobForm from "@/pages/Project/Workspace/Kubernetes/Job/JobForm"; +import FlinkKubernetesJobDeployForm from "@/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm"; -const FlinkKubernetesJobDetailWeb: React.FC = () => { +const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { const intl = useIntl(); const access = useAccess(); const urlParams = useLocation() + const [jobDeployFormData, setJobDeployFormData] = useState<{ + visiable: boolean; + data: WsFlinkKubernetesJob; + }>({visiable: false, data: {}}); + + useEffect(() => { + const data = urlParams.state as WsFlinkKubernetesJob + refreshJob(data.id) + let timer = setInterval(() => { + refreshJob(data.id) + }, 3000); + return () => { + clearInterval(timer); + }; + }, []); + + const refreshJob = (id: number) => { + props.dispatch({ + type: 'jobDetail/queryJob', + payload: id + }) + } const descriptionColumns: ProDescriptionsItemProps[] = [ { @@ -70,21 +93,13 @@ const FlinkKubernetesJobDetailWeb: React.FC = () => { valueType: 'option', render: () => [
- { - WsFlinkKubernetesJobService.deploy({wsFlinkKubernetesJobId: urlParams.state.id}).then(response => { - message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); - }) - }} + - + {intl.formatMessage({id: 'pages.project.flink.kubernetes.job.detail.deploy'})} + @@ -150,23 +168,38 @@ const FlinkKubernetesJobDetailWeb: React.FC = () => { { label: intl.formatMessage({id: 'pages.project.flink.kubernetes.job.detail.yaml'}), key: 'yaml', - children: + children: }, ] return ( - - - - +
+ + + + + {jobDeployFormData.visiable && ( + { + setJobDeployFormData({visiable: false, data: {}}); + }} + onVisibleChange={(visiable) => { + setJobDeployFormData({visiable: visiable, data: {}}); + }} + data={jobDeployFormData.data} + /> + )} +
); } -export default FlinkKubernetesJobDetailWeb; +const mapModelToProps = ({jobDetail}: any) => ({jobDetail}) +export default connect(mapModelToProps)(FlinkKubernetesJobDetailWeb); diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index a6340d036..69430a7b5 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -42,9 +42,9 @@ VALUES (2, 1, '0', 'sql-runner1', NULL, 'sys', 'sys'); INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) VALUES (3, 1, '0', 'sql-runner2', NULL, 'sys', 'sys'); INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) -VALUES (4, 1, '2', 'fake', NULL, 'sys', 'sys'); +VALUES (4, 1, '2', 'e_commerce', NULL, 'sys', 'sys'); INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) -VALUES (5, 1, '2', 'e_commerce', NULL, 'sys', 'sys'); +VALUES (5, 1, '2', 'fake', NULL, 'sys', 'sys'); INSERT INTO `ws_flink_artifact`(`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) VALUES (6, 1, '0', 'catalog-example', NULL, 'sys', 'sys'); @@ -64,7 +64,7 @@ create table ws_flink_artifact_jar editor varchar(32) comment '修改人', update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), - key idx_flink_artifact (flink_artifact_id) + key idx_flink_artifact (flink_artifact_id) ) engine = innodb comment = 'flink artifact jar'; DROP TABLE IF EXISTS ws_flink_artifact_sql; @@ -80,7 +80,7 @@ CREATE TABLE ws_flink_artifact_sql editor varchar(32), update_time datetime not null default current_timestamp on update current_timestamp, PRIMARY KEY (id), - key idx_flink_artifact (flink_artifact_id) + key idx_flink_artifact (flink_artifact_id) ) ENGINE = INNODB COMMENT = 'flink artifact sql'; INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`, @@ -117,7 +117,7 @@ create table ws_di_job editor varchar(32) comment '修改人', update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), - key idx_flink_artifact (flink_artifact_id) + key idx_flink_artifact (flink_artifact_id) ) engine = innodb comment '数据集成-作业信息'; INSERT INTO ws_di_job (id, flink_artifact_id, job_engine, job_id, current, creator, editor) VALUES (1, 4, 'seatunnel', 'b8e16c94-258c-4487-a88c-8aad40a38b35', 1, 'sys', 'sys'); @@ -415,6 +415,7 @@ CREATE TABLE ws_flink_kubernetes_job_instance ws_flink_kubernetes_job_id bigint not null, instance_id varchar(64) not null, parallelism int not null default 1, + upgrade_mode varchar(32), job_manager text, task_manager text, user_flink_configuration text,