Skip to content

Commit

Permalink
[Feature][scaleph-engine-seatunnel] replace job graph by scaleph-dag …
Browse files Browse the repository at this point in the history
…module (#678)

* feature: replace dag

* feature: replace dag attrs

* feature: replace dag

* feature: replace dag

* fix: npe

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Dec 30, 2023
1 parent 5c14b28 commit ddec5b1
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import cn.sliew.scaleph.api.annotation.Logging;
import cn.sliew.scaleph.common.exception.ScalephException;
import cn.sliew.scaleph.common.util.PropertyUtil;
import cn.sliew.scaleph.engine.flink.service.WsFlinkArtifactJarService;
import cn.sliew.scaleph.engine.flink.service.dto.WsFlinkArtifactJarDTO;
import cn.sliew.scaleph.engine.flink.service.param.*;
Expand All @@ -32,7 +31,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

Expand All @@ -42,7 +40,6 @@
import java.io.IOException;
import java.net.URLEncoder;
import java.util.List;
import java.util.Map;

@Tag(name = "Flink管理-artifact-jar")
@RestController
Expand Down Expand Up @@ -88,10 +85,6 @@ public ResponseEntity<WsFlinkArtifactJarDTO> selectOne(@PathVariable("id") Long
@PutMapping
@Operation(summary = "上传 artifact jar", description = "上传 artifact jar")
public ResponseEntity<ResponseVO> upload(@Valid WsFlinkArtifactJarUploadParam param, @RequestPart("file") MultipartFile file) throws IOException, UidGenerateException {
if (StringUtils.hasText(param.getJarParams())) {
Map<String, Object> map = PropertyUtil.formatPropFromStr(param.getJarParams(), "\n", ":");
param.setJarParams(PropertyUtil.mapToFormatProp(map, "\n", ":"));
}
wsFlinkArtifactJarService.upload(param, file);
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
}
Expand All @@ -100,10 +93,6 @@ public ResponseEntity<ResponseVO> upload(@Valid WsFlinkArtifactJarUploadParam pa
@PostMapping("jar")
@Operation(summary = "修改 artifact jar", description = "修改 artifact jar")
public ResponseEntity<ResponseVO> updateJar(@Valid WsFlinkArtifactJarUpdateParam param, @RequestPart(value = "file", required = false) MultipartFile file) throws UidGenerateException, IOException {
if (StringUtils.hasText(param.getJarParams())) {
Map<String, Object> map = PropertyUtil.formatPropFromStr(param.getJarParams(), "\n", ":");
param.setJarParams(PropertyUtil.mapToFormatProp(map, "\n", ":"));
}
this.wsFlinkArtifactJarService.update(param, file);
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,38 @@

package cn.sliew.scaleph.common.util;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public enum PropertyUtil {
;

public static Map<String, Object> formatPropFromStr(String str, String lineSeparator, String kvSeparator) {
Map<String, Object> map = new HashMap<>();
public static Map<String, String> formatPropFromStr(String str) {
return formatPropFromStr(str, "\n", "=");
}

public static String mapToFormatProp(Map<String, String> map) {
return mapToFormatProp(map, "\n", "=");
}

public static Map<String, String> formatPropFromStr(String str, String lineSeparator, String kvSeparator) {
if (StringUtils.hasText(str)) {
String[] arr = str.split(lineSeparator);
for (int i = 0; i < arr.length; i++) {
String[] kv = arr[i].split(kvSeparator);
if (kv.length == 2) {
map.put(kv[0], kv[1]);
}
}
return Splitter.on(lineSeparator).withKeyValueSeparator(kvSeparator).split(str);
}
return map;
return Collections.emptyMap();
}

public static String mapToFormatProp(Map<String, Object> map, String lineSeparator, String kvSeparator) {
public static String mapToFormatProp(Map<String, String> map, String lineSeparator, String kvSeparator) {
if (CollectionUtils.isEmpty(map)) {
return null;
}
StringBuffer buffer = new StringBuffer();
map.forEach((k, v) -> {
buffer.append(k).append(kvSeparator).append(v).append(lineSeparator);
});
return buffer.toString();
return Joiner.on(lineSeparator).withKeyValueSeparator(kvSeparator).join(map);
}

public static Properties mapToProperties(Map<String, Object> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;

import java.util.Collection;
import java.util.List;

public interface WsDiJobService {
Expand Down Expand Up @@ -54,5 +53,4 @@ public interface WsDiJobService {

Long totalCnt(String jobType);

int clone(Long sourceJobId, Long targetJobId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.seatunnel.service.convert;

import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.convert.BaseConvert;
import cn.sliew.scaleph.common.dict.job.JobAttrType;
import cn.sliew.scaleph.common.util.PropertyUtil;
import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.mapstruct.Mapper;
import org.mapstruct.ReportingPolicy;
import org.mapstruct.factory.Mappers;

import java.util.Map;

@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE)
public interface WsDiJobAttrVOConvert extends BaseConvert<JsonNode, DiJobAttrVO> {
WsDiJobAttrVOConvert INSTANCE = Mappers.getMapper(WsDiJobAttrVOConvert.class);

@Override
default JsonNode toDo(DiJobAttrVO dto) {
ObjectNode objectNode = JacksonUtil.createObjectNode();
objectNode.putPOJO(JobAttrType.VARIABLE.getValue(), PropertyUtil.formatPropFromStr(dto.getJobAttr()));
objectNode.putPOJO(JobAttrType.ENV.getValue(), PropertyUtil.formatPropFromStr(dto.getJobProp()));
objectNode.putPOJO(JobAttrType.PROPERTIES.getValue(), PropertyUtil.formatPropFromStr(dto.getEngineProp()));
return objectNode;
}

@Override
default DiJobAttrVO toDto(JsonNode entity) {
DiJobAttrVO vo = new DiJobAttrVO();
if (entity == null) {
return vo;
}
ObjectNode dagAttrs = (ObjectNode) entity;
Map<String, String> variable = JacksonUtil.toObject(dagAttrs.get(JobAttrType.VARIABLE.getValue()), new TypeReference<Map<String, String>>() {});
Map<String, String> env = JacksonUtil.toObject(dagAttrs.get(JobAttrType.ENV.getValue()), new TypeReference<Map<String, String>>() {});
Map<String, String> properties = JacksonUtil.toObject(dagAttrs.get(JobAttrType.PROPERTIES.getValue()), new TypeReference<Map<String, String>>() {});
vo.setJobAttr(PropertyUtil.mapToFormatProp(variable));
vo.setJobProp(PropertyUtil.mapToFormatProp(env));
vo.setEngineProp(PropertyUtil.mapToFormatProp(properties));
return vo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.seatunnel.service.convert;

import cn.sliew.scaleph.common.convert.BaseConvert;
import cn.sliew.scaleph.dag.service.dto.DagLinkDTO;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobLinkDTO;
import org.mapstruct.Mapper;
import org.mapstruct.ReportingPolicy;
import org.mapstruct.factory.Mappers;
import org.springframework.beans.BeanUtils;

@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE)
public interface WsDiJobLinkConvert2 extends BaseConvert<DagLinkDTO, WsDiJobLinkDTO> {
WsDiJobLinkConvert2 INSTANCE = Mappers.getMapper(WsDiJobLinkConvert2.class);

@Override
default DagLinkDTO toDo(WsDiJobLinkDTO dto) {
throw new UnsupportedOperationException();
}

@Override
default WsDiJobLinkDTO toDto(DagLinkDTO entity) {
WsDiJobLinkDTO dto = new WsDiJobLinkDTO();
BeanUtils.copyProperties(entity, dto);
dto.setLinkCode(entity.getLinkId());
dto.setFromStepCode(entity.getFromStepId());
dto.setToStepCode(entity.getToStepId());
return dto;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.seatunnel.service.convert;

import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.convert.BaseConvert;
import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginName;
import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginType;
import cn.sliew.scaleph.dag.service.dto.DagStepDTO;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobStepDTO;
import cn.sliew.scaleph.system.service.convert.DictVoConvert;
import com.fasterxml.jackson.core.type.TypeReference;
import org.mapstruct.Mapper;
import org.mapstruct.ReportingPolicy;
import org.mapstruct.factory.Mappers;
import org.springframework.beans.BeanUtils;

import java.util.Map;

@Mapper(uses = {DictVoConvert.class}, unmappedTargetPolicy = ReportingPolicy.IGNORE)
public interface WsDiJobStepConvert2 extends BaseConvert<DagStepDTO, WsDiJobStepDTO> {
WsDiJobStepConvert2 INSTANCE = Mappers.getMapper(WsDiJobStepConvert2.class);

@Override
default DagStepDTO toDo(WsDiJobStepDTO dto) {
throw new UnsupportedOperationException();
}

@Override
default WsDiJobStepDTO toDto(DagStepDTO entity) {
WsDiJobStepDTO dto = new WsDiJobStepDTO();
BeanUtils.copyProperties(entity, dto);
dto.setStepCode(entity.getStepId());
dto.setStepTitle(entity.getStepName());
dto.setPositionX(entity.getPositionX());
dto.setPositionY(entity.getPositionY());
Map<String, Object> stepAttrs = JacksonUtil.toObject(entity.getStepAttrs(), new TypeReference<Map<String, Object>>() {});
dto.setStepAttrs(stepAttrs);
dto.setStepType(SeaTunnelPluginType.of(stepAttrs.get("type").toString()));
dto.setStepName(SeaTunnelPluginName.of(stepAttrs.get("name").toString()));
return dto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.common.dict.common.YesOrNo;
import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelEngineType;
import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO;
import cn.sliew.scaleph.engine.seatunnel.service.vo.JobGraphVO;
import cn.sliew.scaleph.project.service.dto.WsFlinkArtifactDTO;
import cn.sliew.scaleph.system.model.BaseDTO;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class WsDiJobDTO extends BaseDTO {
private YesOrNo current;

@Schema(description = "作业属性信息")
private List<WsDiJobAttrDTO> jobAttrList;
private DiJobAttrVO jobAttrList;

@Schema(description = "作业连线信息")
private List<WsDiJobLinkDTO> jobLinkList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginName;
import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginType;
import cn.sliew.scaleph.common.util.PropertyUtil;
import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConfigService;
import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConnectorService;
import cn.sliew.scaleph.engine.seatunnel.service.constant.SeaTunnelConstant;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobAttrDTO;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobDTO;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobLinkDTO;
import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobStepDTO;
import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO;
import cn.sliew.scaleph.plugin.framework.exception.PluginException;
import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin;
import cn.sliew.scaleph.plugin.seatunnel.flink.env.JobNameProperties;
Expand Down Expand Up @@ -73,27 +74,19 @@ public String buildConfig(WsDiJobDTO job) throws Exception {
return conf.toPrettyString();
}

private void buildEnvs(ObjectNode conf, String jobName, List<WsDiJobAttrDTO> jobAttrList) {
private void buildEnvs(ObjectNode conf, String jobName, DiJobAttrVO jobAttrList) {
conf.set(SeaTunnelConstant.ENV, buildEnv(jobName, jobAttrList));
}

private ObjectNode buildEnv(String jobName, List<WsDiJobAttrDTO> jobAttrs) {
private ObjectNode buildEnv(String jobName, DiJobAttrVO jobAttrs) {
ObjectNode env = JacksonUtil.createObjectNode();
env.put(JobNameProperties.JOB_NAME.getName(), jobName);
if (CollectionUtils.isEmpty(jobAttrs)) {
if (jobAttrs == null || StringUtils.hasText(jobAttrs.getJobAttr()) == false) {
return env;
}
for (WsDiJobAttrDTO attr : jobAttrs) {
switch (attr.getJobAttrType()) {
case ENV:
break;
case VARIABLE:
env.put(attr.getJobAttrKey(), attr.getJobAttrValue());
break;
case PROPERTIES:
break;
default:
}
Map<String, String> jobAttrList = PropertyUtil.formatPropFromStr(jobAttrs.getJobAttr());
for (Map.Entry<String, String> entry : jobAttrList.entrySet()) {
env.put(entry.getKey(), entry.getValue());
}
return env;
}
Expand Down Expand Up @@ -127,7 +120,7 @@ private MutableGraph<ObjectNode> buildGraph(WsDiJobDTO wsDiJobDTO) throws Plugin
}

private Properties mergeJobAttrs(WsDiJobStepDTO step) throws PluginException {
Properties properties = convertToProperties(step.getStepAttrs());
Properties properties = PropertyUtil.mapToProperties((Map<String, Object>) step.getStepAttrs().get("attrs"));
SeaTunnelPluginType pluginType = SeaTunnelPluginType.of(step.getStepType().getValue());
SeaTunnelConnectorPlugin connector = seatunnelConnectorService.getConnector(pluginType, step.getStepName());
for (ResourceProperty resource : connector.getRequiredResources()) {
Expand All @@ -142,23 +135,6 @@ private Properties mergeJobAttrs(WsDiJobStepDTO step) throws PluginException {
return properties;
}

private Properties convertToProperties(Map<String, Object> stepAttrList) {
Properties properties = new Properties();
if (CollectionUtils.isEmpty(stepAttrList)) {
return properties;
}
stepAttrList.forEach((key, value) -> {
if (value instanceof String) {
if (StringUtils.hasText(String.valueOf(value))) {
properties.put(key, value);
}
} else {
properties.put(key, value);
}
});
return properties;
}

private void buildNodes(ObjectNode conf, Set<ObjectNode> nodes) {
ArrayNode sourceConf = JacksonUtil.createArrayNode();
ArrayNode transformConf = JacksonUtil.createArrayNode();
Expand Down
Loading

0 comments on commit ddec5b1

Please sign in to comment.