From f28afe8f23d81171bd6052666bea5b978f7f7d49 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sat, 15 Jul 2023 08:53:02 +0800 Subject: [PATCH 1/7] feature: update flink kubernetes job instance service --- ...FlinkKubernetesJobInstanceServiceImpl.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 1ad98ce8b..75b53eb7e 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -22,9 +22,10 @@ import cn.sliew.scaleph.common.util.UUIDUtil; import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance; import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkDeploymentJobConverter; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkSessionJobConverter; import cn.sliew.scaleph.engine.flink.kubernetes.service.FlinkKubernetesOperatorService; import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobInstanceService; -import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobService; import cn.sliew.scaleph.engine.flink.kubernetes.service.convert.WsFlinkKubernetesJobInstanceConvert; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; @@ -48,9 +49,9 @@ public class WsFlinkKubernetesJobInstanceServiceImpl implements WsFlinkKubernete @Autowired private WsFlinkKubernetesJobInstanceMapper wsFlinkKubernetesJobInstanceMapper; @Autowired - private WsFlinkKubernetesJobService wsFlinkKubernetesJobService; - @Autowired private FlinkKubernetesOperatorService flinkKubernetesOperatorService; + @Autowired + private FlinkDeploymentJobConverter flinkDeploymentJobConverter; @Override public Page list(WsFlinkKubernetesJobInstanceListParam param) { @@ -74,12 +75,30 @@ public WsFlinkKubernetesJobInstanceDTO selectOne(Long id) { @Override public WsFlinkKubernetesJobInstanceDTO selectCurrent(Long wsFlinkKubernetesJobId) { + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkKubernetesJobInstance.class) + .eq(WsFlinkKubernetesJobInstance::getWsFlinkKubernetesJobId, wsFlinkKubernetesJobId) + .orderByDesc(WsFlinkKubernetesJobInstance::getId) + .last("limit 1"); + + WsFlinkKubernetesJobInstance record = wsFlinkKubernetesJobInstanceMapper.selectOne(queryWrapper); + if (record != null) { + return WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDto(record); + } return null; } @Override public Object asYaml(Long id) throws Exception { - return null; + WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(id); + WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); + switch (jobDTO.getDeploymentKind()) { + case FLINK_DEPLOYMENT: + return flinkDeploymentJobConverter.convertTo(jobDTO); + case FLINK_SESSION_JOB: + return FlinkSessionJobConverter.INSTANCE.convertTo(jobDTO); + default: + throw new RuntimeException("unsupport flink deployment mode for " + jobDTO.getDeploymentKind()); + } } @Override @@ -97,7 +116,8 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti record.setUserFlinkConfiguration(JacksonUtil.toJsonString(param.getUserFlinkConfiguration())); } wsFlinkKubernetesJobInstanceMapper.insert(record); - WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(param.getWsFlinkKubernetesJobId()); + WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(record.getId()); + WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); Object yaml = asYaml(record.getId()); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: @@ -113,7 +133,7 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti @Override public void shutdown(WsFlinkKubernetesJobInstanceShutdownParam param) throws Exception { WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(param.getId()); - WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(jobInstanceDTO.getWsFlinkKubernetesJobId()); + WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); Object yaml = asYaml(param.getId()); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: From 6c765ad4689b378fed0a35153daccd1ee5877f4a Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sat, 22 Jul 2023 08:51:50 +0800 Subject: [PATCH 2/7] feature: add flink job shutdown form --- .../ws/WsFlinkKubernetesJobInstance.java | 3 + .../ws/WsFlinkKubernetesJobInstanceMapper.xml | 7 +- .../dto/WsFlinkKubernetesJobInstanceDTO.java | 3 + ...FlinkKubernetesJobInstanceDeployParam.java | 3 + .../src/locales/zh-CN/pages/project.ts | 1 + .../Job/Detail/DeployStateStepForm.tsx | 8 +- .../Kubernetes/Job/Detail/JobDeployForm.tsx | 1 + .../Kubernetes/Job/Detail/JobShutdownForm.tsx | 278 ++++++++++++++++++ .../Workspace/Kubernetes/Job/Detail/index.tsx | 81 +++-- .../src/services/project/typings.d.ts | 2 + .../docker/mysql/init.d/scaleph-ws-mysql.sql | 7 +- 11 files changed, 370 insertions(+), 24 deletions(-) create mode 100644 scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobShutdownForm.tsx diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java index 40f8450c4..c61c64a5e 100644 --- a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstance.java @@ -54,6 +54,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO { @TableField("upgrade_mode") private UpgradeMode upgradeMode; + @TableField("allow_non_restored_state") + private Boolean allowNonRestoredState; + @TableField("job_manager") private String jobManager; diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceMapper.xml index 36127e678..4ea42ae3e 100644 --- a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceMapper.xml +++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceMapper.xml @@ -28,6 +28,8 @@ + + @@ -50,6 +52,8 @@ + + @@ -68,7 +72,8 @@ id, creator, create_time, editor, update_time, ws_flink_kubernetes_job_id, instance_id, - parallelism, job_manager, task_manager, user_flink_configuration, + parallelism, upgrade_mode, allow_non_restored_state, + job_manager, task_manager, user_flink_configuration, `state`, `error`, cluster_info, task_manager_info, start_time, end_time, duration diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java index ccdfd782f..bdfebcd41 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceDTO.java @@ -56,6 +56,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO { @Schema(description = "upgrade mode") private UpgradeMode upgradeMode; + @Schema(description = "allow to skip savepoint state") + private Boolean allowNonRestoredState; + @Schema(description = "job manager") private JobManagerSpec jobManager; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java index 4b8582e45..e6a43ac78 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceDeployParam.java @@ -40,6 +40,9 @@ public class WsFlinkKubernetesJobInstanceDeployParam { @Schema(description = "upgrade mode") private UpgradeMode upgradeMode; + @Schema(description = "allow to skip savepoint state") + private Boolean allowNonRestoredState; + @Schema(description = "job manager") private JobManagerSpec jobManager; diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index 71fcd1c8c..62c10a074 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -805,6 +805,7 @@ export default { 'pages.project.flink.kubernetes.job.detail.deploy.resource.userFlinkConfiguration': 'Flink配置', 'pages.project.flink.kubernetes.job.detail.deploy.state': 'State', 'pages.project.flink.kubernetes.job.detail.deploy.state.upgradeMode': 'Upgrade 方式', + 'pages.project.flink.kubernetes.job.detail.deploy.state.allowNonRestoredState': 'AllowNonRestoredState', 'pages.project.flink.kubernetes.job.detail.suspend': 'Suspend', 'pages.project.flink.kubernetes.job.detail.shutdown': 'Shutdown', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx index 9ce24d922..9003b19f7 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/DeployStateStepForm.tsx @@ -1,6 +1,6 @@ import {useIntl} from "umi"; import React from "react"; -import {ProFormGroup, ProFormRadio} from "@ant-design/pro-components"; +import {ProFormGroup, ProFormRadio, ProFormSwitch} from "@ant-design/pro-components"; import {DictDataService} from "@/services/admin/dictData.service"; import {DICT_TYPE} from "@/constant"; @@ -17,6 +17,12 @@ const FlinkKubernetesJobDeployStateStepForm: React.FC = () => { return DictDataService.listDictDataByType2(DICT_TYPE.upgradeMode) }} /> + ); diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx index ff074aad7..108a0f47f 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm.tsx @@ -56,6 +56,7 @@ const FlinkKubernetesJobDeployForm: React.FC { diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobShutdownForm.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobShutdownForm.tsx new file mode 100644 index 000000000..d9fbaf6ec --- /dev/null +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/JobShutdownForm.tsx @@ -0,0 +1,278 @@ +import {useIntl} from "umi"; +import React from "react"; +import {Form, message, Modal} from "antd"; +import { + ProForm, + ProFormDependency, + ProFormDigit, + ProFormGroup, + ProFormRadio, + ProFormSelect, + ProFormText +} from "@ant-design/pro-components"; +import {ModalFormProps} from '@/app.d'; +import { + WsDiJobSelectListParam, + WsFlinkArtifactJarSelectListParam, + WsFlinkArtifactSqlSelectListParam, + WsFlinkKubernetesDeploymentSelectListParam, + WsFlinkKubernetesJob, + WsFlinkKubernetesSessionClusterSelectListParam +} from "@/services/project/typings"; +import {DictDataService} from "@/services/admin/dictData.service"; +import {DICT_TYPE, WORKSPACE_CONF} from "@/constant"; +import {WsFlinkKubernetesDeploymentService} from "@/services/project/WsFlinkKubernetesDeploymentService"; +import {WsFlinkKubernetesSessionClusterService} from "@/services/project/WsFlinkKubernetesSessionClusterService"; +import {FlinkArtifactJarService} from "@/services/project/flinkArtifactJar.service"; +import {FlinkArtifactSqlService} from "@/services/project/WsFlinkArtifactSqlService"; +import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; +import {WsDiJobService} from "@/services/project/WsDiJobService"; + +const FlinkKubernetesJobShutdownForm: React.FC> = ({ + data, + visible, + onVisibleChange, + onCancel + }) => { + const intl = useIntl(); + const [form] = Form.useForm(); + const projectId = localStorage.getItem(WORKSPACE_CONF.projectId); + + return ( + { + form.validateFields().then((values) => { + data.id + ? WsFlinkKubernetesJobService.update({...values}).then((response) => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.edit.success'})); + if (onVisibleChange) { + onVisibleChange(false); + } + } + }) + : WsFlinkKubernetesJobService.add({...values, projectId: projectId}).then((response) => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.new.success'})); + if (onVisibleChange) { + onVisibleChange(false); + } + } + }); + }); + }} + > + + + + ); +} + +export default FlinkKubernetesJobShutdownForm; diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx index 5e0b42459..0547d0895 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx @@ -1,6 +1,6 @@ import {connect, useAccess, useIntl, useLocation} from "umi"; import React, {useEffect, useState} from "react"; -import {Button, message, Popconfirm, Tabs} from "antd"; +import {Button, message, Modal, Popconfirm, Tabs} from "antd"; import FlinkKubernetesJobDetailYAMLWeb from "@/pages/Project/Workspace/Kubernetes/Job/Detail/YAML"; import {ProDescriptionsItemProps} from "@ant-design/pro-descriptions"; import {WsFlinkKubernetesJob} from "@/services/project/typings"; @@ -11,13 +11,13 @@ import { CameraOutlined, CaretRightOutlined, CloseOutlined, - DashboardOutlined, + DashboardOutlined, DeleteOutlined, OrderedListOutlined, PauseOutlined } from "@ant-design/icons"; import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService"; -import FlinkKubernetesJobForm from "@/pages/Project/Workspace/Kubernetes/Job/JobForm"; import FlinkKubernetesJobDeployForm from "@/pages/Project/Workspace/Kubernetes/Job/Detail/JobDeployForm"; +import FlinkKubernetesJobShutdownForm from "@/pages/Project/Workspace/Kubernetes/Job/Detail/JobShutdownForm"; const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { const intl = useIntl(); @@ -27,6 +27,10 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { visiable: boolean; data: WsFlinkKubernetesJob; }>({visiable: false, data: {}}); + const [jobShutdownFormData, setJobShutdownFormData] = useState<{ + visiable: boolean; + data: WsFlinkKubernetesJob; + }>({visiable: false, data: {}}); useEffect(() => { const data = urlParams.state as WsFlinkKubernetesJob @@ -96,7 +100,9 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { @@ -108,23 +114,47 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { {intl.formatMessage({id: 'pages.project.flink.kubernetes.job.detail.suspend'})} - { - WsFlinkKubernetesJobService.shutdown({id: props.jobDetail.job.jobInstance.id}).then(response => { - if (response.success) { - message.success(intl.formatMessage({id: 'app.common.operate.submit.success'})); - } - refreshJob(props.jobDetail.job.id) - }) + - + {intl.formatMessage({id: 'pages.project.flink.kubernetes.job.detail.shutdown'})} + + + {/* {*/} + {/* WsFlinkKubernetesJobService.shutdown({id: props.jobDetail.job.jobInstance.id}).then(response => {*/} + {/* if (response.success) {*/} + {/* message.success(intl.formatMessage({id: 'app.common.operate.submit.success'}));*/} + {/* }*/} + {/* refreshJob(props.jobDetail.job.id)*/} + {/* })*/} + {/* }}*/} + {/*>*/} + + {/**/} + + @@ -116,45 +114,10 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { - - {/* {*/} - {/* WsFlinkKubernetesJobService.shutdown({id: props.jobDetail.job.jobInstance.id}).then(response => {*/} - {/* if (response.success) {*/} - {/* message.success(intl.formatMessage({id: 'app.common.operate.submit.success'}));*/} - {/* }*/} - {/* refreshJob(props.jobDetail.job.id)*/} - {/* })*/} - {/* }}*/} - {/*>*/} - - {/**/} - - + + ,