diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactJarController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactJarController.java index cf5a2ebdc..d01f627db 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactJarController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsArtifactJarController.java @@ -20,7 +20,6 @@ import cn.sliew.scaleph.api.annotation.Logging; import cn.sliew.scaleph.common.exception.ScalephException; -import cn.sliew.scaleph.common.util.PropertyUtil; import cn.sliew.scaleph.engine.flink.service.WsFlinkArtifactJarService; import cn.sliew.scaleph.engine.flink.service.dto.WsFlinkArtifactJarDTO; import cn.sliew.scaleph.engine.flink.service.param.*; @@ -32,7 +31,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; @@ -42,7 +40,6 @@ import java.io.IOException; import java.net.URLEncoder; import java.util.List; -import java.util.Map; @Tag(name = "Flink管理-artifact-jar") @RestController @@ -88,10 +85,6 @@ public ResponseEntity selectOne(@PathVariable("id") Long @PutMapping @Operation(summary = "上传 artifact jar", description = "上传 artifact jar") public ResponseEntity upload(@Valid WsFlinkArtifactJarUploadParam param, @RequestPart("file") MultipartFile file) throws IOException, UidGenerateException { - if (StringUtils.hasText(param.getJarParams())) { - Map map = PropertyUtil.formatPropFromStr(param.getJarParams(), "\n", ":"); - param.setJarParams(PropertyUtil.mapToFormatProp(map, "\n", ":")); - } wsFlinkArtifactJarService.upload(param, file); return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK); } @@ -100,10 +93,6 @@ public ResponseEntity upload(@Valid WsFlinkArtifactJarUploadParam pa @PostMapping("jar") @Operation(summary = "修改 artifact jar", description = "修改 artifact jar") public ResponseEntity updateJar(@Valid WsFlinkArtifactJarUpdateParam param, @RequestPart(value = "file", required = false) MultipartFile file) throws UidGenerateException, IOException { - if (StringUtils.hasText(param.getJarParams())) { - Map map = PropertyUtil.formatPropFromStr(param.getJarParams(), "\n", ":"); - param.setJarParams(PropertyUtil.mapToFormatProp(map, "\n", ":")); - } this.wsFlinkArtifactJarService.update(param, file); return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK); } diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/PropertyUtil.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/PropertyUtil.java index fad40dd7e..503676a35 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/PropertyUtil.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/PropertyUtil.java @@ -18,39 +18,38 @@ package cn.sliew.scaleph.common.util; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import java.util.HashMap; +import java.util.Collections; import java.util.Map; import java.util.Properties; public enum PropertyUtil { ; - public static Map formatPropFromStr(String str, String lineSeparator, String kvSeparator) { - Map map = new HashMap<>(); + public static Map formatPropFromStr(String str) { + return formatPropFromStr(str, "\n", "="); + } + + public static String mapToFormatProp(Map map) { + return mapToFormatProp(map, "\n", "="); + } + + public static Map formatPropFromStr(String str, String lineSeparator, String kvSeparator) { if (StringUtils.hasText(str)) { - String[] arr = str.split(lineSeparator); - for (int i = 0; i < arr.length; i++) { - String[] kv = arr[i].split(kvSeparator); - if (kv.length == 2) { - map.put(kv[0], kv[1]); - } - } + return Splitter.on(lineSeparator).withKeyValueSeparator(kvSeparator).split(str); } - return map; + return Collections.emptyMap(); } - public static String mapToFormatProp(Map map, String lineSeparator, String kvSeparator) { + public static String mapToFormatProp(Map map, String lineSeparator, String kvSeparator) { if (CollectionUtils.isEmpty(map)) { return null; } - StringBuffer buffer = new StringBuffer(); - map.forEach((k, v) -> { - buffer.append(k).append(kvSeparator).append(v).append(lineSeparator); - }); - return buffer.toString(); + return Joiner.on(lineSeparator).withKeyValueSeparator(kvSeparator).join(map); } public static Properties mapToProperties(Map map) { diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/WsDiJobService.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/WsDiJobService.java index 4b1b256b0..4871ae6b9 100644 --- a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/WsDiJobService.java +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/WsDiJobService.java @@ -23,7 +23,6 @@ import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import java.util.Collection; import java.util.List; public interface WsDiJobService { @@ -54,5 +53,4 @@ public interface WsDiJobService { Long totalCnt(String jobType); - int clone(Long sourceJobId, Long targetJobId); } diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobAttrVOConvert.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobAttrVOConvert.java new file mode 100644 index 000000000..ebde238cb --- /dev/null +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobAttrVOConvert.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.seatunnel.service.convert; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.common.convert.BaseConvert; +import cn.sliew.scaleph.common.dict.job.JobAttrType; +import cn.sliew.scaleph.common.util.PropertyUtil; +import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.mapstruct.Mapper; +import org.mapstruct.ReportingPolicy; +import org.mapstruct.factory.Mappers; + +import java.util.Map; + +@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE) +public interface WsDiJobAttrVOConvert extends BaseConvert { + WsDiJobAttrVOConvert INSTANCE = Mappers.getMapper(WsDiJobAttrVOConvert.class); + + @Override + default JsonNode toDo(DiJobAttrVO dto) { + ObjectNode objectNode = JacksonUtil.createObjectNode(); + objectNode.putPOJO(JobAttrType.VARIABLE.getValue(), PropertyUtil.formatPropFromStr(dto.getJobAttr())); + objectNode.putPOJO(JobAttrType.ENV.getValue(), PropertyUtil.formatPropFromStr(dto.getJobProp())); + objectNode.putPOJO(JobAttrType.PROPERTIES.getValue(), PropertyUtil.formatPropFromStr(dto.getEngineProp())); + return objectNode; + } + + @Override + default DiJobAttrVO toDto(JsonNode entity) { + DiJobAttrVO vo = new DiJobAttrVO(); + if (entity == null) { + return vo; + } + ObjectNode dagAttrs = (ObjectNode) entity; + Map variable = JacksonUtil.toObject(dagAttrs.get(JobAttrType.VARIABLE.getValue()), new TypeReference>() {}); + Map env = JacksonUtil.toObject(dagAttrs.get(JobAttrType.ENV.getValue()), new TypeReference>() {}); + Map properties = JacksonUtil.toObject(dagAttrs.get(JobAttrType.PROPERTIES.getValue()), new TypeReference>() {}); + vo.setJobAttr(PropertyUtil.mapToFormatProp(variable)); + vo.setJobProp(PropertyUtil.mapToFormatProp(env)); + vo.setEngineProp(PropertyUtil.mapToFormatProp(properties)); + return vo; + } +} diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobLinkConvert2.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobLinkConvert2.java new file mode 100644 index 000000000..06efa56d5 --- /dev/null +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobLinkConvert2.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.seatunnel.service.convert; + +import cn.sliew.scaleph.common.convert.BaseConvert; +import cn.sliew.scaleph.dag.service.dto.DagLinkDTO; +import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobLinkDTO; +import org.mapstruct.Mapper; +import org.mapstruct.ReportingPolicy; +import org.mapstruct.factory.Mappers; +import org.springframework.beans.BeanUtils; + +@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE) +public interface WsDiJobLinkConvert2 extends BaseConvert { + WsDiJobLinkConvert2 INSTANCE = Mappers.getMapper(WsDiJobLinkConvert2.class); + + @Override + default DagLinkDTO toDo(WsDiJobLinkDTO dto) { + throw new UnsupportedOperationException(); + } + + @Override + default WsDiJobLinkDTO toDto(DagLinkDTO entity) { + WsDiJobLinkDTO dto = new WsDiJobLinkDTO(); + BeanUtils.copyProperties(entity, dto); + dto.setLinkCode(entity.getLinkId()); + dto.setFromStepCode(entity.getFromStepId()); + dto.setToStepCode(entity.getToStepId()); + return dto; + } +} diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobStepConvert2.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobStepConvert2.java new file mode 100644 index 000000000..543b8bf9c --- /dev/null +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/convert/WsDiJobStepConvert2.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.seatunnel.service.convert; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.common.convert.BaseConvert; +import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginName; +import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginType; +import cn.sliew.scaleph.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobStepDTO; +import cn.sliew.scaleph.system.service.convert.DictVoConvert; +import com.fasterxml.jackson.core.type.TypeReference; +import org.mapstruct.Mapper; +import org.mapstruct.ReportingPolicy; +import org.mapstruct.factory.Mappers; +import org.springframework.beans.BeanUtils; + +import java.util.Map; + +@Mapper(uses = {DictVoConvert.class}, unmappedTargetPolicy = ReportingPolicy.IGNORE) +public interface WsDiJobStepConvert2 extends BaseConvert { + WsDiJobStepConvert2 INSTANCE = Mappers.getMapper(WsDiJobStepConvert2.class); + + @Override + default DagStepDTO toDo(WsDiJobStepDTO dto) { + throw new UnsupportedOperationException(); + } + + @Override + default WsDiJobStepDTO toDto(DagStepDTO entity) { + WsDiJobStepDTO dto = new WsDiJobStepDTO(); + BeanUtils.copyProperties(entity, dto); + dto.setStepCode(entity.getStepId()); + dto.setStepTitle(entity.getStepName()); + dto.setPositionX(entity.getPositionX()); + dto.setPositionY(entity.getPositionY()); + Map stepAttrs = JacksonUtil.toObject(entity.getStepAttrs(), new TypeReference>() {}); + dto.setStepAttrs(stepAttrs); + dto.setStepType(SeaTunnelPluginType.of(stepAttrs.get("type").toString())); + dto.setStepName(SeaTunnelPluginName.of(stepAttrs.get("name").toString())); + return dto; + } +} diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/dto/WsDiJobDTO.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/dto/WsDiJobDTO.java index da4ae2197..ae46bbabe 100644 --- a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/dto/WsDiJobDTO.java +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/dto/WsDiJobDTO.java @@ -20,6 +20,7 @@ import cn.sliew.scaleph.common.dict.common.YesOrNo; import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelEngineType; +import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO; import cn.sliew.scaleph.engine.seatunnel.service.vo.JobGraphVO; import cn.sliew.scaleph.project.service.dto.WsFlinkArtifactDTO; import cn.sliew.scaleph.system.model.BaseDTO; @@ -58,7 +59,7 @@ public class WsDiJobDTO extends BaseDTO { private YesOrNo current; @Schema(description = "作业属性信息") - private List jobAttrList; + private DiJobAttrVO jobAttrList; @Schema(description = "作业连线信息") private List jobLinkList; diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/SeatunnelConfigServiceImpl.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/SeatunnelConfigServiceImpl.java index cf782c5ed..9c2439ceb 100644 --- a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/SeatunnelConfigServiceImpl.java +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/SeatunnelConfigServiceImpl.java @@ -21,13 +21,14 @@ import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginName; import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginType; +import cn.sliew.scaleph.common.util.PropertyUtil; import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConfigService; import cn.sliew.scaleph.engine.seatunnel.service.SeatunnelConnectorService; import cn.sliew.scaleph.engine.seatunnel.service.constant.SeaTunnelConstant; -import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobAttrDTO; import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobDTO; import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobLinkDTO; import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobStepDTO; +import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO; import cn.sliew.scaleph.plugin.framework.exception.PluginException; import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin; import cn.sliew.scaleph.plugin.seatunnel.flink.env.JobNameProperties; @@ -73,27 +74,19 @@ public String buildConfig(WsDiJobDTO job) throws Exception { return conf.toPrettyString(); } - private void buildEnvs(ObjectNode conf, String jobName, List jobAttrList) { + private void buildEnvs(ObjectNode conf, String jobName, DiJobAttrVO jobAttrList) { conf.set(SeaTunnelConstant.ENV, buildEnv(jobName, jobAttrList)); } - private ObjectNode buildEnv(String jobName, List jobAttrs) { + private ObjectNode buildEnv(String jobName, DiJobAttrVO jobAttrs) { ObjectNode env = JacksonUtil.createObjectNode(); env.put(JobNameProperties.JOB_NAME.getName(), jobName); - if (CollectionUtils.isEmpty(jobAttrs)) { + if (jobAttrs == null || StringUtils.hasText(jobAttrs.getJobAttr()) == false) { return env; } - for (WsDiJobAttrDTO attr : jobAttrs) { - switch (attr.getJobAttrType()) { - case ENV: - break; - case VARIABLE: - env.put(attr.getJobAttrKey(), attr.getJobAttrValue()); - break; - case PROPERTIES: - break; - default: - } + Map jobAttrList = PropertyUtil.formatPropFromStr(jobAttrs.getJobAttr()); + for (Map.Entry entry : jobAttrList.entrySet()) { + env.put(entry.getKey(), entry.getValue()); } return env; } @@ -127,7 +120,7 @@ private MutableGraph buildGraph(WsDiJobDTO wsDiJobDTO) throws Plugin } private Properties mergeJobAttrs(WsDiJobStepDTO step) throws PluginException { - Properties properties = convertToProperties(step.getStepAttrs()); + Properties properties = PropertyUtil.mapToProperties((Map) step.getStepAttrs().get("attrs")); SeaTunnelPluginType pluginType = SeaTunnelPluginType.of(step.getStepType().getValue()); SeaTunnelConnectorPlugin connector = seatunnelConnectorService.getConnector(pluginType, step.getStepName()); for (ResourceProperty resource : connector.getRequiredResources()) { @@ -142,23 +135,6 @@ private Properties mergeJobAttrs(WsDiJobStepDTO step) throws PluginException { return properties; } - private Properties convertToProperties(Map stepAttrList) { - Properties properties = new Properties(); - if (CollectionUtils.isEmpty(stepAttrList)) { - return properties; - } - stepAttrList.forEach((key, value) -> { - if (value instanceof String) { - if (StringUtils.hasText(String.valueOf(value))) { - properties.put(key, value); - } - } else { - properties.put(key, value); - } - }); - return properties; - } - private void buildNodes(ObjectNode conf, Set nodes) { ArrayNode sourceConf = JacksonUtil.createArrayNode(); ArrayNode transformConf = JacksonUtil.createArrayNode(); diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java index b9b9b9de4..1ac7d0445 100644 --- a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java @@ -21,10 +21,11 @@ import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.common.YesOrNo; import cn.sliew.scaleph.common.dict.flink.FlinkJobType; -import cn.sliew.scaleph.common.dict.job.JobAttrType; -import cn.sliew.scaleph.common.util.BeanUtil; import cn.sliew.scaleph.dag.service.DagService; +import cn.sliew.scaleph.dag.service.dto.DagDTO; import cn.sliew.scaleph.dag.service.dto.DagInstanceDTO; +import cn.sliew.scaleph.dag.service.param.DagSimpleAddParam; +import cn.sliew.scaleph.dag.service.param.DagSimpleUpdateParam; import cn.sliew.scaleph.dag.service.vo.DagGraphVO; import cn.sliew.scaleph.dao.DataSourceConstants; import cn.sliew.scaleph.dao.entity.master.ws.WsDiJob; @@ -32,23 +33,28 @@ import cn.sliew.scaleph.engine.seatunnel.service.WsDiJobAttrService; import cn.sliew.scaleph.engine.seatunnel.service.WsDiJobGraphService; import cn.sliew.scaleph.engine.seatunnel.service.WsDiJobService; +import cn.sliew.scaleph.engine.seatunnel.service.convert.WsDiJobAttrVOConvert; import cn.sliew.scaleph.engine.seatunnel.service.convert.WsDiJobConvert; -import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobAttrDTO; +import cn.sliew.scaleph.engine.seatunnel.service.convert.WsDiJobLinkConvert2; +import cn.sliew.scaleph.engine.seatunnel.service.convert.WsDiJobStepConvert2; import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobDTO; +import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobLinkDTO; +import cn.sliew.scaleph.engine.seatunnel.service.dto.WsDiJobStepDTO; import cn.sliew.scaleph.engine.seatunnel.service.param.*; import cn.sliew.scaleph.engine.seatunnel.service.vo.DiJobAttrVO; import cn.sliew.scaleph.project.service.WsFlinkArtifactService; import cn.sliew.scaleph.project.service.dto.WsFlinkArtifactDTO; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; import static cn.sliew.milky.common.check.Ensures.checkState; @@ -60,10 +66,6 @@ public class WsDiJobServiceImpl implements WsDiJobService { @Autowired private WsDiJobMapper diJobMapper; @Autowired - private WsDiJobGraphService wsDiJobGraphService; - @Autowired - private WsDiJobAttrService wsDiJobAttrService; - @Autowired private DagService dagService; @Override @@ -98,8 +100,7 @@ public WsDiJobDTO insert(WsDiJobAddParam param) { flinkArtifact.setRemark(param.getRemark()); flinkArtifact = wsFlinkArtifactService.insert(flinkArtifact); - DagInstanceDTO instanceDTO = new DagInstanceDTO(); - Long dagId = dagService.insert(instanceDTO); + Long dagId = dagService.insert(new DagSimpleAddParam()); WsDiJob record = new WsDiJob(); record.setFlinkArtifactId(flinkArtifact.getId()); record.setJobId(UUID.randomUUID().toString()); @@ -134,9 +135,6 @@ public int delete(Long id) { if (wsDiJobDTO.getCurrent() == YesOrNo.YES) { wsFlinkArtifactService.deleteById(wsDiJobDTO.getWsFlinkArtifact().getId()); } - //todo check if there is running job instance - wsDiJobGraphService.deleteBatch(Collections.singletonList(id)); - wsDiJobAttrService.deleteByJobId(Collections.singletonList(id)); return diJobMapper.deleteById(id); } @@ -155,30 +153,42 @@ public int deleteBatch(List ids) { @Override public WsDiJobDTO queryJobGraph(Long id) { WsDiJobDTO job = selectOne(id); - wsDiJobGraphService.queryJobGraph(job); - job.setJobAttrList(wsDiJobAttrService.listJobAttr(id)); + DagDTO dagDTO = dagService.selectOne(job.getDagId()); + fillGraph(job, dagDTO); return job; } + private void fillGraph(WsDiJobDTO job, DagDTO dagDTO) { + List jobLinkDTOS = dagDTO.getLinks().stream().map(link -> { + WsDiJobLinkDTO dto = WsDiJobLinkConvert2.INSTANCE.toDto(link); + dto.setJobId(job.getId()); + return dto; + }).collect(Collectors.toList()); + List jobStepDTOS = dagDTO.getSteps().stream().map(step -> { + WsDiJobStepDTO dto = WsDiJobStepConvert2.INSTANCE.toDto(step); + dto.setJobId(job.getId()); + return dto; + }).collect(Collectors.toList()); + job.setJobLinkList(jobLinkDTOS); + job.setJobStepList(jobStepDTOS); + // 属性信息 + DiJobAttrVO jobAttrVO = WsDiJobAttrVOConvert.INSTANCE.toDto(dagDTO.getDagAttrs()); + jobAttrVO.setJobId(job.getId()); + job.setJobAttrList(jobAttrVO); + } + @Transactional(rollbackFor = Exception.class, transactionManager = DataSourceConstants.MASTER_TRANSACTION_MANAGER_FACTORY) @Override public Long saveJobStep(WsDiJobStepParam param) { - DagGraphVO jobGraphVO = JacksonUtil.parseJsonString(param.getJobGraph(), DagGraphVO.class); - wsDiJobGraphService.saveJobGraph(param.getJobId(), jobGraphVO); WsDiJobDTO wsDiJobDTO = selectOne(param.getJobId()); + DagGraphVO jobGraphVO = JacksonUtil.parseJsonString(param.getJobGraph(), DagGraphVO.class); dagService.replace(wsDiJobDTO.getDagId(), jobGraphVO); - - WsDiJobStepParam copiedParam = BeanUtil.copy(param, new WsDiJobStepParam()); - copiedParam.setJobId(param.getJobId()); - wsDiJobGraphService.updateJobStep(copiedParam); - return param.getJobId(); } @Transactional(rollbackFor = Exception.class, transactionManager = DataSourceConstants.MASTER_TRANSACTION_MANAGER_FACTORY) @Override public Long saveJobGraph(WsDiJobGraphParam param) { - wsDiJobGraphService.saveJobGraph(param.getJobId(), param.getJobGraph()); WsDiJobDTO wsDiJobDTO = selectOne(param.getJobId()); dagService.replace(wsDiJobDTO.getDagId(), param.getJobGraph()); return param.getJobId(); @@ -186,73 +196,27 @@ public Long saveJobGraph(WsDiJobGraphParam param) { @Override public DiJobAttrVO listJobAttrs(Long id) { - DiJobAttrVO vo = new DiJobAttrVO(); + WsDiJobDTO wsDiJobDTO = selectOne(id); + DagInstanceDTO instanceDTO = dagService.selectSimpleOne(wsDiJobDTO.getDagId()); + DiJobAttrVO vo = WsDiJobAttrVOConvert.INSTANCE.toDto(instanceDTO.getDagAttrs()); vo.setJobId(id); - List list = wsDiJobAttrService.listJobAttr(id); - for (WsDiJobAttrDTO jobAttr : list) { - String str = jobAttr.getJobAttrKey().concat("=").concat(jobAttr.getJobAttrValue()); - String tempStr = null; - switch (jobAttr.getJobAttrType()) { - case VARIABLE: - tempStr = StringUtils.hasText(vo.getJobAttr()) ? vo.getJobAttr() : ""; - vo.setJobAttr(tempStr + str + "\n"); - break; - case ENV: - tempStr = StringUtils.hasText(vo.getJobProp()) ? vo.getJobProp() : ""; - vo.setJobProp(tempStr + str + "\n"); - break; - case PROPERTIES: - tempStr = StringUtils.hasText(vo.getEngineProp()) ? vo.getEngineProp() : ""; - vo.setEngineProp(tempStr + str + "\n"); - break; - default: - } - } return vo; } @Transactional(rollbackFor = Exception.class, transactionManager = DataSourceConstants.MASTER_TRANSACTION_MANAGER_FACTORY) @Override public Long saveJobAttrs(DiJobAttrVO vo) { - Map map = new HashMap<>(); - parseJobAttr(map, vo.getJobAttr(), JobAttrType.VARIABLE, vo.getJobId()); - parseJobAttr(map, vo.getJobProp(), JobAttrType.ENV, vo.getJobId()); - parseJobAttr(map, vo.getEngineProp(), JobAttrType.PROPERTIES, vo.getJobId()); - wsDiJobAttrService.deleteByJobId(Collections.singletonList(vo.getJobId())); - for (Map.Entry entry : map.entrySet()) { - wsDiJobAttrService.upsert(entry.getValue()); - } + WsDiJobDTO wsDiJobDTO = selectOne(vo.getJobId()); + DagSimpleUpdateParam param = new DagSimpleUpdateParam(); + param.setId(wsDiJobDTO.getDagId()); + param.setDagAttrs(WsDiJobAttrVOConvert.INSTANCE.toDo(vo)); + dagService.update(param); return vo.getJobId(); } - private void parseJobAttr(Map map, String str, JobAttrType jobAttrType, Long jobId) { - if (StringUtils.hasText(str)) { - String[] lines = str.split("\n"); - for (String line : lines) { - String[] kv = line.split("="); - if (kv.length == 2 && StringUtils.hasText(kv[0]) && StringUtils.hasText(kv[1])) { - WsDiJobAttrDTO dto = new WsDiJobAttrDTO(); - dto.setJobId(jobId); - dto.setJobAttrType(jobAttrType); - dto.setJobAttrKey(kv[0]); - dto.setJobAttrValue(kv[1]); - map.put(jobId + jobAttrType.getValue() + kv[0], dto); - } - } - } - } - @Override public Long totalCnt(String jobType) { - LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsDiJob.class); - return diJobMapper.selectCount(queryWrapper); + return diJobMapper.selectCount(Wrappers.emptyWrapper()); } - @Override - public int clone(Long sourceJobId, Long targetJobId) { - int result = 0; - wsDiJobGraphService.clone(sourceJobId, targetJobId); - result += wsDiJobAttrService.clone(sourceJobId, targetJobId); - return result; - } } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/clickhouse/sink/ClickHouseSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/clickhouse/sink/ClickHouseSinkPlugin.java index 18e460b53..5dfc3a2ff 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/clickhouse/sink/ClickHouseSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/clickhouse/sink/ClickHouseSinkPlugin.java @@ -74,11 +74,10 @@ public ObjectNode createConf() { for (PropertyDescriptor descriptor : getSupportedProperties()) { if (properties.contains(descriptor)) { if (CLICKHOUSE_CONF.getName().equals(descriptor.getName())) { - Map map = PropertyUtil - .formatPropFromStr(properties.getValue(descriptor), "\n", "="); - for (Map.Entry entry : map.entrySet()) { + Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor)); + for (Map.Entry entry : map.entrySet()) { objectNode - .put("clickhouse." + entry.getKey(), String.valueOf(entry.getValue())); + .put("clickhouse." + entry.getKey(), entry.getValue()); } } else if (FIELDS.getName().equals(descriptor.getName())) { String[] splitFields = properties.getValue(descriptor).split(","); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/doris/sink/DorisSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/doris/sink/DorisSinkPlugin.java index 74e3073f4..401efdc2e 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/doris/sink/DorisSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/doris/sink/DorisSinkPlugin.java @@ -69,9 +69,9 @@ public ObjectNode createConf() { for (PropertyDescriptor descriptor : getSupportedProperties()) { if (properties.contains(descriptor)) { if (DORIS_CONF.getName().equals(descriptor.getName())) { - Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor), "\n", "="); - for (Map.Entry entry : map.entrySet()) { - objectNode.put("doris." + entry.getKey(), String.valueOf(entry.getValue())); + Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor)); + for (Map.Entry entry : map.entrySet()) { + objectNode.put("doris." + entry.getKey(), entry.getValue()); } } else { objectNode.put(descriptor.getName(), properties.getValue(descriptor)); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/sink/KafkaSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/sink/KafkaSinkPlugin.java index fbfc6d097..5372f8ab7 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/sink/KafkaSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/sink/KafkaSinkPlugin.java @@ -75,9 +75,9 @@ public ObjectNode createConf() { String bootStrapServers = metaDatasource.getProps().get(KafkaProperties.BOOTSTRAP_SERVERS.getName()).toString(); objectNode.put(PRODUCER_BOOTSTRAP_SERVERS.getName().replace('_', '.'), bootStrapServers); } else if (PRODUCER_CONF.getName().equals(descriptor.getName())) { - Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor), "\n", "="); - for (Map.Entry entry : map.entrySet()) { - objectNode.put("producer." + entry.getKey(), String.valueOf(entry.getValue())); + Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor)); + for (Map.Entry entry : map.entrySet()) { + objectNode.put("producer." + entry.getKey(), entry.getValue()); } } else if (SEMANTIC.getName().equals(descriptor.getName())) { DictVO dictVO = JacksonUtil.parseJsonString(properties.getValue(descriptor), DictVO.class); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/source/KafkaSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/source/KafkaSourcePlugin.java index 74ada2335..152b6b044 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/source/KafkaSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-native-flink/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connector/kafka/source/KafkaSourcePlugin.java @@ -82,14 +82,14 @@ public ObjectNode createConf() { String bootStrapServers = metaDatasource.getProps().get(KafkaProperties.BOOTSTRAP_SERVERS.getName()).toString(); objectNode.put(CONSUMER_BOOTSTRAP_SERVERS.getName().replace("_", "."), bootStrapServers); } else if (CONSUMER_CONF.getName().equals(descriptor.getName())) { - Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor), "\n", "="); - for (Map.Entry entry : map.entrySet()) { - objectNode.put("consumer." + entry.getKey(), String.valueOf(entry.getValue())); + Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor)); + for (Map.Entry entry : map.entrySet()) { + objectNode.put("consumer." + entry.getKey(), entry.getValue()); } } else if (FORMAT_CONF.getName().equals(descriptor.getName())) { - Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor), "\n", "="); - for (Map.Entry entry : map.entrySet()) { - objectNode.put(entry.getKey(), String.valueOf(entry.getValue())); + Map map = PropertyUtil.formatPropFromStr(properties.getValue(descriptor)); + for (Map.Entry entry : map.entrySet()) { + objectNode.put(entry.getKey(), entry.getValue()); } } else if (descriptor.getName().contains("_")) { objectNode.put(descriptor.getName().replace("_", "."), properties.getValue(descriptor)); diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagService.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagService.java index b516ed924..381c76145 100644 --- a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagService.java +++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagService.java @@ -20,6 +20,8 @@ import cn.sliew.scaleph.dag.service.dto.DagDTO; import cn.sliew.scaleph.dag.service.dto.DagInstanceDTO; +import cn.sliew.scaleph.dag.service.param.DagSimpleAddParam; +import cn.sliew.scaleph.dag.service.param.DagSimpleUpdateParam; import cn.sliew.scaleph.dag.service.vo.DagGraphVO; import java.util.List; @@ -28,7 +30,11 @@ public interface DagService { DagDTO selectOne(Long dagId); - Long insert(DagInstanceDTO instanceDTO); + DagInstanceDTO selectSimpleOne(Long dagId); + + Long insert(DagSimpleAddParam param); + + int update(DagSimpleUpdateParam param); void replace(Long dagId, DagGraphVO graph); diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagServiceImpl.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagServiceImpl.java index 5fafbbbfb..062c3c168 100644 --- a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagServiceImpl.java +++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagServiceImpl.java @@ -27,6 +27,8 @@ import cn.sliew.scaleph.dag.service.dto.DagInstanceDTO; import cn.sliew.scaleph.dag.service.dto.DagLinkDTO; import cn.sliew.scaleph.dag.service.dto.DagStepDTO; +import cn.sliew.scaleph.dag.service.param.DagSimpleAddParam; +import cn.sliew.scaleph.dag.service.param.DagSimpleUpdateParam; import cn.sliew.scaleph.dag.service.vo.DagGraphVO; import cn.sliew.scaleph.dag.service.vo.EdgeCellVO; import cn.sliew.scaleph.dag.service.vo.NodeCellVO; @@ -58,10 +60,24 @@ public DagDTO selectOne(Long dagId) { } @Override - public Long insert(DagInstanceDTO instanceDTO) { + public DagInstanceDTO selectSimpleOne(Long dagId) { + return dagInstanceService.selectOne(dagId); + } + + @Override + public Long insert(DagSimpleAddParam param) { + DagInstanceDTO instanceDTO = new DagInstanceDTO(); + BeanUtils.copyProperties(param, instanceDTO); return dagInstanceService.insert(instanceDTO); } + @Override + public int update(DagSimpleUpdateParam param) { + DagInstanceDTO instanceDTO = new DagInstanceDTO(); + BeanUtils.copyProperties(param, instanceDTO); + return dagInstanceService.update(instanceDTO); + } + @Override public void replace(Long dagId, DagGraphVO graph) { saveSteps(dagId, graph.getNodes()); diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagSimpleAddParam.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagSimpleAddParam.java new file mode 100644 index 000000000..459ab1980 --- /dev/null +++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagSimpleAddParam.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dag.service.param; + +import com.fasterxml.jackson.databind.JsonNode; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +public class DagSimpleAddParam { + + @Schema(description = "DAG元信息") + private JsonNode dagMeta; + + @Schema(description = "DAG属性") + private JsonNode dagAttrs; +} diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagSimpleUpdateParam.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagSimpleUpdateParam.java new file mode 100644 index 000000000..36d636863 --- /dev/null +++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagSimpleUpdateParam.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dag.service.param; + +import com.fasterxml.jackson.databind.JsonNode; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.constraints.NotNull; + +@Data +public class DagSimpleUpdateParam { + + @NotNull + @Schema(description = "id") + private Long id; + + @Schema(description = "DAG元信息") + private JsonNode dagMeta; + + @Schema(description = "DAG属性") + private JsonNode dagAttrs; +} diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/config-graph.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/config-graph.tsx index 80a76502d..dbbc936ba 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/config-graph.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/config-graph.tsx @@ -214,25 +214,25 @@ export const useGraphConfig = createGraphConfig((config) => { const edgeAdded: IEvent<'edge:added'> = { eventName: 'edge:added', callback: (eventArgs, command, modelService) => { - console.log('edge:added', eventArgs, e) + console.log('edge:added', eventArgs) }, } const edgeRemoved: IEvent<'edge:removed'> = { eventName: 'edge:removed', callback: (eventArgs, command, modelService) => { - console.log('edge:removed', eventArgs, e) + console.log('edge:removed', eventArgs) }, } const nodeAdded: IEvent<'node:added'> = { eventName: 'node:added', callback: (eventArgs, command, modelService) => { - console.log('node:added', eventArgs, e) + console.log('node:added', eventArgs) }, } const nodeRemoved: IEvent<'node:removed'> = { eventName: 'node:removed', callback: (eventArgs, command, modelService) => { - console.log('node:removed', eventArgs, e) + console.log('node:removed', eventArgs) }, } config.setEvents([edgeAdded, edgeRemoved, nodeAdded, nodeRemoved])