Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-engine-flink-kubernetes] support additional dependencies for flink kubernetes deployment #580

Merged
merged 14 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@

@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum FlinkJobState implements DictInstance {
SUBMITING("SUBMITING", "提交中"),
SUBMITED("SUBMITED", "已提交"),
SUBMIT_FAILED("SUBMIT_FAILED", "提交失败"),

INITIALIZING("INITIALIZING", "初始化"),
CREATED("CREATED", "已创建"),
RUNNING("RUNNING", "运行中"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class WsFlinkKubernetesDeployment extends BaseDO {
@TableField("ingress")
private String ingress;

@TableField("additional_dependencies")
private String additionalDependencies;

@TableField("remark")
private String remark;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.dao.entity.master.ws;

import cn.sliew.scaleph.common.dict.flink.FlinkJobState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.dao.entity.BaseDO;
Expand Down Expand Up @@ -54,6 +55,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO {
@TableField("upgrade_mode")
private UpgradeMode upgradeMode;

@TableField("allow_non_restored_state")
private Boolean allowNonRestoredState;

@TableField("job_manager")
private String jobManager;

Expand All @@ -66,6 +70,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO {
@TableField(value = "`state`", updateStrategy = FieldStrategy.IGNORED)
private ResourceLifecycleState state;

@TableField(value = "job_state", updateStrategy = FieldStrategy.IGNORED)
private FlinkJobState jobState;

@TableField(value = "`error`", updateStrategy = FieldStrategy.IGNORED)
private String error;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class WsFlinkKubernetesSessionCluster extends BaseDO {
@TableField("ingress")
private String ingress;

@TableField("additional_dependencies")
private String additionalDependencies;

@TableField("support_sql_gateway")
private YesOrNo supportSqlGateway;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class WsFlinkKubernetesTemplate extends BaseDO {
@TableField("ingress")
private String ingress;

@TableField("additional_dependencies")
private String additionalDependencies;

@TableField("remark")
private String remark;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@
<result column="flink_configuration" property="flinkConfiguration"/>
<result column="log_configuration" property="logConfiguration"/>
<result column="ingress" property="ingress"/>
<result column="additional_dependencies" property="additionalDependencies"/>
<result column="remark" property="remark"/>
</resultMap>

<sql id="Base_Column_List">
id, creator, create_time, editor, update_time,
project_id, cluster_credential_id, `name`, deployment_id, namespace,
kubernetes_options, job_manager, task_manager, pod_template,
flink_configuration, log_configuration, ingress, remark
flink_configuration, log_configuration, ingress, additional_dependencies,
remark
</sql>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
<result column="ws_flink_kubernetes_job_id" property="wsFlinkKubernetesJobId"/>
<result column="instance_id" property="instanceId"/>
<result column="parallelism" property="parallelism"/>
<result column="upgrade_mode" property="upgradeMode"/>
<result column="allow_non_restored_state" property="allowNonRestoredState"/>
<result column="job_manager" property="jobManager"/>
<result column="task_manager" property="taskManager"/>
<result column="user_flink_configuration" property="userFlinkConfiguration"/>
<result column="state" property="state"/>
<result column="job_state" property="jobState"/>
<result column="error" property="error"/>
<result column="cluster_info" property="clusterInfo"/>
<result column="task_manager_info" property="taskManagerInfo"/>
Expand All @@ -50,10 +53,13 @@
<result column="ws_flink_kubernetes_job_id" property="wsFlinkKubernetesJobId"/>
<result column="instance_id" property="instanceId"/>
<result column="parallelism" property="parallelism"/>
<result column="upgrade_mode" property="upgradeMode"/>
<result column="allow_non_restored_state" property="allowNonRestoredState"/>
<result column="job_manager" property="jobManager"/>
<result column="task_manager" property="taskManager"/>
<result column="user_flink_configuration" property="userFlinkConfiguration"/>
<result column="state" property="state"/>
<result column="job_state" property="jobState"/>
<result column="error" property="error"/>
<result column="cluster_info" property="clusterInfo"/>
<result column="task_manager_info" property="taskManagerInfo"/>
Expand All @@ -68,8 +74,9 @@
<sql id="Base_Column_List">
id, creator, create_time, editor, update_time,
ws_flink_kubernetes_job_id, instance_id,
parallelism, job_manager, task_manager, user_flink_configuration,
`state`, `error`, cluster_info, task_manager_info,
parallelism, upgrade_mode, allow_non_restored_state,
job_manager, task_manager, user_flink_configuration,
`state`, job_state, `error`, cluster_info, task_manager_info,
start_time, end_time, duration
</sql>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<result column="flink_configuration" property="flinkConfiguration"/>
<result column="log_configuration" property="logConfiguration"/>
<result column="ingress" property="ingress"/>
<result column="additional_dependencies" property="additionalDependencies"/>
<result column="support_sql_gateway" property="supportSqlGateway"/>
<result column="state" property="state"/>
<result column="error" property="error"/>
Expand All @@ -51,7 +52,7 @@
project_id, cluster_credential_id, `name`, session_cluster_id,
namespace, kubernetes_options, job_manager, task_manager,
pod_template, flink_configuration, log_configuration,
ingress, support_sql_gateway, `state`, `error`, cluster_info,
task_manager_info, remark
ingress, additional_dependencies, support_sql_gateway,
`state`, `error`, cluster_info, task_manager_info, remark
</sql>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<result column="flink_configuration" property="flinkConfiguration" />
<result column="log_configuration" property="logConfiguration" />
<result column="ingress" property="ingress" />
<result column="additional_dependencies" property="additionalDependencies" />
<result column="remark" property="remark" />
</resultMap>

Expand All @@ -47,7 +48,7 @@
project_id, `name`, template_id, deployment_kind, namespace,
kubernetes_options, job_manager, task_manager,
pod_template, flink_configuration, log_configuration,
ingress, remark
ingress, additional_dependencies, remark
</sql>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected Runnable doExecute(ActionContext context, ActionListener<ActionResult>
private void process() {
List<Long> jobIds = wsFlinkKubernetesJobService.listAll();
jobIds.forEach(this::doProcess);
log.info("update flink kubernetes job status success! update size: {}", jobIds.size());
log.debug("update flink kubernetes job status success! update size: {}", jobIds.size());
}

private void doProcess(Long jobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected Runnable doExecute(ActionContext context, ActionListener<ActionResult>
private void process() {
List<Long> sessionClusterIds = wsFlinkKubernetesSessionClusterService.listAll();
sessionClusterIds.forEach(this::doProcess);
log.info("update flink kubernetes session-cluster status success! update size: {}", sessionClusterIds.size());
log.debug("update flink kubernetes session-cluster status success! update size: {}", sessionClusterIds.size());
}

private void doProcess(Long sessionClusterId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ default WsFlinkKubernetesDeployment toDo(WsFlinkKubernetesDeploymentDTO dto) {
if (dto.getIngress() != null) {
entity.setIngress(JacksonUtil.toJsonString(dto.getIngress()));
}
if (dto.getAdditionalDependencies() != null) {
entity.setAdditionalDependencies(JacksonUtil.toJsonString(dto.getAdditionalDependencies()));
}
return entity;
}

Expand Down Expand Up @@ -93,6 +96,9 @@ default WsFlinkKubernetesDeploymentDTO toDto(WsFlinkKubernetesDeployment entity)
if (StringUtils.hasText(entity.getIngress())) {
dto.setIngress(JacksonUtil.parseJsonString(entity.getIngress(), IngressSpec.class));
}
if (StringUtils.hasText(entity.getAdditionalDependencies())) {
dto.setAdditionalDependencies(JacksonUtil.parseJsonArray(entity.getAdditionalDependencies(), Long.TYPE));
}
return dto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ default WsFlinkKubernetesSessionCluster toDo(WsFlinkKubernetesSessionClusterDTO
if (dto.getIngress() != null) {
entity.setIngress(JacksonUtil.toJsonString(dto.getIngress()));
}
if (dto.getAdditionalDependencies() != null) {
entity.setAdditionalDependencies(JacksonUtil.toJsonString(dto.getAdditionalDependencies()));
}
if (CollectionUtils.isEmpty(dto.getClusterInfo()) == false) {
entity.setClusterInfo(JacksonUtil.toJsonString(dto.getClusterInfo()));
}
Expand Down Expand Up @@ -100,6 +103,9 @@ default WsFlinkKubernetesSessionClusterDTO toDto(WsFlinkKubernetesSessionCluster
if (StringUtils.hasText(entity.getIngress())) {
dto.setIngress(JacksonUtil.parseJsonString(entity.getIngress(), IngressSpec.class));
}
if (StringUtils.hasText(entity.getAdditionalDependencies())) {
dto.setAdditionalDependencies(JacksonUtil.parseJsonArray(entity.getAdditionalDependencies(), Long.TYPE));
}
if (StringUtils.hasText(entity.getClusterInfo())) {
dto.setClusterInfo(JacksonUtil.parseJsonString(entity.getClusterInfo(), Map.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ default WsFlinkKubernetesTemplate toDo(WsFlinkKubernetesTemplateDTO dto) {
if (dto.getIngress() != null) {
entity.setIngress(JacksonUtil.toJsonString(dto.getIngress()));
}
if (dto.getAdditionalDependencies() != null) {
entity.setAdditionalDependencies(JacksonUtil.toJsonString(dto.getAdditionalDependencies()));
}
return entity;
}

Expand Down Expand Up @@ -93,6 +96,9 @@ default WsFlinkKubernetesTemplateDTO toDto(WsFlinkKubernetesTemplate entity) {
if (StringUtils.hasText(entity.getIngress())) {
dto.setIngress(JacksonUtil.parseJsonString(entity.getIngress(), IngressSpec.class));
}
if (StringUtils.hasText(entity.getAdditionalDependencies())) {
dto.setAdditionalDependencies(JacksonUtil.parseJsonArray(entity.getAdditionalDependencies(), Long.TYPE));
}
return dto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;
import java.util.Map;

@Data
Expand Down Expand Up @@ -71,6 +72,9 @@ public class WsFlinkKubernetesDeploymentDTO extends BaseDTO {
@Schema(description = "ingress spec")
private IngressSpec ingress;

@Schema(description = "additional dependencies")
private List<Long> additionalDependencies;

@Schema(description = "remark")
private String remark;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.engine.flink.kubernetes.service.dto;

import cn.sliew.scaleph.common.dict.flink.FlinkJobState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.dao.entity.BaseDO;
Expand Down Expand Up @@ -56,6 +57,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO {
@Schema(description = "upgrade mode")
private UpgradeMode upgradeMode;

@Schema(description = "allow to skip savepoint state")
private Boolean allowNonRestoredState;

@Schema(description = "job manager")
private JobManagerSpec jobManager;

Expand All @@ -65,9 +69,12 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO {
@Schema(description = "user flink configuration")
private Map<String, String> userFlinkConfiguration;

@Schema(description = "state")
@Schema(description = "deploy state")
private ResourceLifecycleState state;

@Schema(description = "job state")
private FlinkJobState jobState;

@Schema(description = "error")
private String error;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;
import java.util.Map;

@Data
Expand Down Expand Up @@ -74,6 +75,9 @@ public class WsFlinkKubernetesSessionClusterDTO extends BaseDTO {
@Schema(description = "ingress")
private IngressSpec ingress;

@Schema(description = "additional dependencies")
private List<Long> additionalDependencies;

@Schema(description = "support sql gateway")
private YesOrNo supportSqlGateway;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -77,6 +78,9 @@ public class WsFlinkKubernetesTemplateDTO extends BaseDTO {
@Schema(description = "ingress")
private IngressSpec ingress;

@Schema(description = "additional dependencies")
private List<Long> additionalDependencies;

@Schema(description = "remark")
private String remark;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public WsFlinkKubernetesDeploymentDTO fromTemplate(Long templateId) {
wsFlinkKubernetesDeploymentDTO.setFlinkConfiguration(wsFlinkKubernetesTemplateDTO.getFlinkConfiguration());
wsFlinkKubernetesDeploymentDTO.setLogConfiguration(wsFlinkKubernetesTemplateDTO.getLogConfiguration());
wsFlinkKubernetesDeploymentDTO.setIngress(wsFlinkKubernetesTemplateDTO.getIngress());
wsFlinkKubernetesDeploymentDTO.setAdditionalDependencies(wsFlinkKubernetesTemplateDTO.getAdditionalDependencies());
wsFlinkKubernetesDeploymentDTO.setRemark("generated from template-" + wsFlinkKubernetesTemplateDTO.getName());
return wsFlinkKubernetesDeploymentDTO;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.dict.flink.FlinkJobState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.util.UUIDUtil;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.FlinkDeploymentStatus;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.JobStatus;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import cn.sliew.scaleph.engine.flink.kubernetes.service.FlinkKubernetesOperatorService;
import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobInstanceService;
Expand All @@ -48,6 +50,7 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -185,6 +188,15 @@ public int updateStatus(Long id, FlinkDeploymentStatus status) {
WsFlinkKubernetesJobInstance record = new WsFlinkKubernetesJobInstance();
record.setId(id);
record.setState(EnumUtils.getEnum(ResourceLifecycleState.class, status.getLifecycleState().name()));
if (status.getJobStatus() != null) {
JobStatus jobStatus = status.getJobStatus();
if (jobStatus.getState() != null) {
record.setJobState(FlinkJobState.of(jobStatus.getState()));
}
if (jobStatus.getStartTime() != null) {
record.setStartTime(new Date(Long.parseLong(jobStatus.getStartTime())));
}
}
record.setError(status.getError());
if (CollectionUtils.isEmpty(status.getClusterInfo())) {
record.setClusterInfo(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public WsFlinkKubernetesSessionClusterDTO fromTemplate(Long templateId) {
wsFlinkKubernetesSessionClusterDTO.setFlinkConfiguration(wsFlinkKubernetesTemplateDTO.getFlinkConfiguration());
wsFlinkKubernetesSessionClusterDTO.setLogConfiguration(wsFlinkKubernetesTemplateDTO.getLogConfiguration());
wsFlinkKubernetesSessionClusterDTO.setIngress(wsFlinkKubernetesTemplateDTO.getIngress());
wsFlinkKubernetesSessionClusterDTO.setAdditionalDependencies(wsFlinkKubernetesTemplateDTO.getAdditionalDependencies());
wsFlinkKubernetesSessionClusterDTO.setRemark("generated from template-" + wsFlinkKubernetesTemplateDTO.getName());
return wsFlinkKubernetesSessionClusterDTO;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private WsFlinkKubernetesTemplateDTO doMergeDefault(WsFlinkKubernetesTemplateDTO
result.setFlinkConfiguration(TemplateMerger.merge(globalDefault.getFlinkConfiguration(), dto.getFlinkConfiguration(), Map.class));
result.setLogConfiguration(TemplateMerger.merge(globalDefault.getLogConfiguration(), dto.getLogConfiguration(), Map.class));
result.setIngress(TemplateMerger.merge(globalDefault.getIngress(), dto.getIngress(), IngressSpec.class));
result.setAdditionalDependencies(TemplateMerger.merge(globalDefault.getAdditionalDependencies(), dto.getAdditionalDependencies(), List.class));
result.setRemark(StringUtils.hasText(dto.getRemark()) ? dto.getRemark() : globalDefault.getRemark());
return result;
}
Expand Down Expand Up @@ -143,6 +144,7 @@ public int update(WsFlinkKubernetesTemplateUpdateParam param) {
@Override
public int updateTemplate(WsFlinkKubernetesTemplateDTO param) {
WsFlinkKubernetesTemplateDTO mergeWithDefault = mergeDefault(param);
mergeWithDefault.setAdditionalDependencies(param.getAdditionalDependencies());
WsFlinkKubernetesTemplate record = WsFlinkKubernetesTemplateConvert.INSTANCE.toDo(mergeWithDefault);
record.setId(param.getId());
record.setProjectId(param.getProjectId());
Expand Down
Loading