diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java index e676db99c..58e9aa44f 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java @@ -28,6 +28,7 @@ import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentMode; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; import cn.sliew.scaleph.common.dict.job.*; import cn.sliew.scaleph.common.dict.seatunnel.*; @@ -87,6 +88,7 @@ public enum DictType implements DictDefinition { FLINK_KUBERNETES_DEPLOYMENT_MODE("deployment_mode", "Deployment 模式", DeploymentMode.class), FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class), FLINK_KUBERNETES_RESOURCE_LIFECYCLE_STATE("resource_lifecycle_state", "Deployment 状态", ResourceLifecycleState.class), + FLINK_KUBERNETES_UPGRADE_MODE("upgrade_mode", "Upgrade 方式", UpgradeMode.class), FLINK_CATALOG_COLUMN_TYPE("flink_catalog_column_type", "Flink Catalog Table Schema 列类型", CatalogColumnType.class), FLINK_CATALOG_TABLE_KIND("flink_catalog_table_kind", "Flink Catalog Table 类型", CatalogTableKind.class), diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/UpgradeMode.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/UpgradeMode.java new file mode 100644 index 000000000..4ac139391 --- /dev/null +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/UpgradeMode.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.common.dict.flink.kubernetes; + +import cn.sliew.scaleph.common.dict.DictInstance; +import com.baomidou.mybatisplus.annotation.EnumValue; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.util.Arrays; + +@JsonFormat(shape = JsonFormat.Shape.OBJECT) +public enum UpgradeMode implements DictInstance { + + STATELESS("stateless", "stateless"), + LAST_STATE("last-state", "last-state"), + SAVEPOINT("savepoint", "savepoint"), + ; + + @JsonCreator + public static UpgradeMode of(String value) { + return Arrays.stream(values()) + .filter(instance -> instance.getValue().equals(value)) + .findAny().orElseThrow(() -> new EnumConstantNotPresentException(UpgradeMode.class, value)); + } + + @EnumValue + private String value; + private String label; + + UpgradeMode(String value, String label) { + this.value = value; + this.label = label; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getLabel() { + return label; + } +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java index 3accdaa4d..40f8450c4 100644 --- a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java @@ -19,6 +19,7 @@ package cn.sliew.scaleph.dao.entity.master.ws; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.dao.entity.BaseDO; import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.TableField; @@ -50,6 +51,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO { @TableField("parallelism") private Integer parallelism; + @TableField("upgrade_mode") + private UpgradeMode upgradeMode; + @TableField("job_manager") private String jobManager; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java index 337dcb6be..5d41eb6c9 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/SeaTunnelConfHandler.java @@ -50,6 +50,7 @@ public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeployme public ConfigMap buildSeaTunnelConf(String instanceId, Long wsDiJobId, ObjectMeta objectMeta) throws Exception { WsDiJobDTO wsDiJobDTO = wsDiJobService.queryJobGraph(wsDiJobId); + wsDiJobDTO.getWsFlinkArtifact().setName(instanceId); String prettyJson = seatunnelConfigService.buildConfig(wsDiJobDTO); String plainJson = JacksonUtil.toJsonNode(prettyJson).toString(); diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java index 84665aa51..ccdfd782f 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java @@ -19,6 +19,7 @@ package cn.sliew.scaleph.engine.flink.kubernetes.service.dto; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.dao.entity.BaseDO; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec; @@ -52,6 +53,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO { @Schema(description = "parallelism") private Integer parallelism; + @Schema(description = "upgrade mode") + private UpgradeMode upgradeMode; + @Schema(description = "job manager") private JobManagerSpec jobManager; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java index 9567bd5be..4b8582e45 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java @@ -18,6 +18,7 @@ package cn.sliew.scaleph.engine.flink.kubernetes.service.param; +import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.TaskManagerSpec; import io.swagger.v3.oas.annotations.media.Schema; @@ -36,6 +37,9 @@ public class WsFlinkKubernetesJobInstanceDeployParam { @Schema(description = "parallelism") private Integer parallelism; + @Schema(description = "upgrade mode") + private UpgradeMode upgradeMode; + @Schema(description = "job manager") private JobManagerSpec jobManager; diff --git a/scaleph-ui-react/src/constant.ts b/scaleph-ui-react/src/constant.ts index 63ff4d45c..1d18bc95b 100644 --- a/scaleph-ui-react/src/constant.ts +++ b/scaleph-ui-react/src/constant.ts @@ -51,6 +51,7 @@ export const DICT_TYPE = { deploymentKind: 'deployment_kind', resourceLifecycleState: 'resource_lifecycle_state', + upgradeMode: 'upgrade_mode', }; diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index 8f3c7da81..71fcd1c8c 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -794,6 +794,18 @@ export default { 'pages.project.flink.kubernetes.job.detail': '详情', 'pages.project.flink.kubernetes.job.detail.deploy': 'Deploy', + 'pages.project.flink.kubernetes.job.detail.deploy.resource': 'Resource', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.jobManagerCpu': 'JobManager CPU', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.jobManagerMemory': 'JobManager Memory', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.jobManagerReplicas': 'JobManager Replicas', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.taskManagerCpu': 'TaskManager CPU', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.taskManagerMemory': 'TaskManager Memory', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.taskManagerReplicas': 'TaskManager Replicas', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.parallelism': 'Parallelism', + 'pages.project.flink.kubernetes.job.detail.deploy.resource.userFlinkConfiguration': 'Flink配置', + 'pages.project.flink.kubernetes.job.detail.deploy.state': 'State', + 'pages.project.flink.kubernetes.job.detail.deploy.state.upgradeMode': 'Upgrade 方式', + 'pages.project.flink.kubernetes.job.detail.suspend': 'Suspend', 'pages.project.flink.kubernetes.job.detail.shutdown': 'Shutdown', 'pages.project.flink.kubernetes.job.detail.savepoint': 'Savepoint', @@ -801,6 +813,8 @@ export default { 'pages.project.flink.kubernetes.job.detail.metrics': 'Metrics', 'pages.project.flink.kubernetes.job.detail.logs': 'Logs', + + 'pages.project.flink.kubernetes.job.detail.yaml': 'YAML', }; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm.tsx new file mode 100644 index 000000000..f6f56b1f2 --- /dev/null +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm.tsx @@ -0,0 +1,83 @@ +import {useIntl} from "umi"; +import React from "react"; +import {ProFormDigit, ProFormGroup, ProFormText, ProFormTextArea} from "@ant-design/pro-components"; + +const FlinkKubernetesJobDeployResourceStepForm: React.FC = () => { + const intl = useIntl(); + + return ( + + + + + + + + + + + ); +} + +export default FlinkKubernetesJobDeployResourceStepForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx new file mode 100644 index 000000000..9ce24d922 --- /dev/null +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx @@ -0,0 +1,25 @@ +import {useIntl} from "umi"; +import React from "react"; +import {ProFormGroup, ProFormRadio} from "@ant-design/pro-components"; +import {DictDataService} from "@/services/admin/dictData.service"; +import {DICT_TYPE} from "@/constant"; + +const FlinkKubernetesJobDeployStateStepForm: React.FC = () => { + const intl = useIntl(); + + return ( + + { + return DictDataService.listDictDataByType2(DICT_TYPE.upgradeMode) + }} + /> + + + ); +} + +export default FlinkKubernetesJobDeployStateStepForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx index e91b81084..ff074aad7 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx @@ -1,10 +1,12 @@ import {useIntl} from "umi"; -import React from "react"; -import {Form, message, Modal} from "antd"; -import {ProForm, ProFormDigit, ProFormGroup, ProFormText} from "@ant-design/pro-components"; +import React, {useRef} from "react"; +import {message, Modal} from "antd"; +import {ProCard, ProFormInstance, StepsForm} from "@ant-design/pro-components"; import {ModalFormProps} from '@/app.d'; -import {WsFlinkKubernetesJob} from "@/services/project/typings"; -import {WORKSPACE_CONF} from "@/constant"; +import {WsFlinkKubernetesJob, WsFlinkKubernetesJobInstanceDeployParam} from "@/services/project/typings"; +import FlinkKubernetesJobDeployResourceStepForm + from "@/pages/Project/Workspace/Kubernetes/Job/Detail/DeployResourceStepForm"; +import FlinkKubernetesJobDeployStateStepForm from "@/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm"; import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; const FlinkKubernetesJobDeployForm: React.FC> = ({ @@ -14,115 +16,71 @@ const FlinkKubernetesJobDeployForm: React.FC { const intl = useIntl(); - const [form] = Form.useForm(); - const projectId = localStorage.getItem(WORKSPACE_CONF.projectId); + const formRef = useRef(); return ( { - form.validateFields().then((values) => { - data.id - ? WsFlinkKubernetesJobService.update({...values}).then((response) => { + > + + ) => { + const jobManagerSpec = { + resource: { + cpu: values.jobManagerCpu, + memory: values.jobManagerMemory, + }, + replicas: values.jobManagerReplicas + } + const taskManagerSpec = { + resource: { + cpu: values.taskManagerCpu, + memory: values.taskManagerMemory, + }, + replicas: values.taskManagerReplicas + } + const param: WsFlinkKubernetesJobInstanceDeployParam = { + wsFlinkKubernetesJobId: data.id, + jobManager: jobManagerSpec, + taskManager: taskManagerSpec, + parallelism: values.parallelism, + upgradeMode: values.upgradeMode, + userFlinkConfiguration: values.userFlinkConfiguration + } + return WsFlinkKubernetesJobService.deploy(param).then((response) => { if (response.success) { - message.success(intl.formatMessage({id: 'app.common.operate.edit.success'})); - if (onVisibleChange) { - onVisibleChange(false); - } + message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); + onVisibleChange(false); } }) - : WsFlinkKubernetesJobService.add({...values, projectId: projectId}).then((response) => { - if (response.success) { - message.success(intl.formatMessage({id: 'app.common.operate.new.success'})); - if (onVisibleChange) { - onVisibleChange(false); - } - } - }); - }); - }} - > - - + - - - - - ); } diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx index c2d222777..5e0b42459 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx @@ -93,25 +93,13 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { valueType: 'option', render: () => [
- { - setJobDeployFormData({visiable: true, data: props.jobDetail.job}) - // WsFlinkKubernetesJobService.deploy({wsFlinkKubernetesJobId: props.jobDetail.job.id}).then(response => { - // if (response.success) { - // message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); - // } - // refreshJob(props.jobDetail.job.id) - // }) - }} + - + {intl.formatMessage({id: 'pages.project.flink.kubernetes.job.detail.deploy'})} +