Skip to content

Commit

Permalink
[Fix #15129] [Dependent] The date rules of the dependent node are amb…
Browse files Browse the repository at this point in the history
…iguous. (#15289)

* [Fix-15129][Dependent] Fix the ambiguity in date rules for dependent node.

* [fix #15129] Revert ddl

* restore findLastProcessInterval

* update: mvn spotless:apply

---------

Co-authored-by: 李乐 <lile@boimc.com>
Co-authored-by: xiangzihao <460888207@qq.com>
  • Loading branch information
3 people authored Dec 8, 2023
1 parent 0bb48f3 commit 0c470ff
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,14 @@ List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int process
* find last task instance list in the date interval
*
* @param taskCodes taskCodes
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return task instance list
*/
List<TaskInstance> findLastTaskInstances(@Param("taskCodes") Set<Long> taskCodes,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
List<TaskInstance> findLastTaskInstances(@Param("processInstanceId") Integer processInstanceId,
@Param("taskCodes") Set<Long> taskCodes,
@Param("testFlag") int testFlag);

TaskInstance findLastTaskInstance(@Param("taskCode") long depTaskCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
TaskInstance findLastTaskInstance(@Param("processInstanceId") Integer processInstanceId,
@Param("taskCode") long depTaskCode,
@Param("testFlag") int testFlag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;

import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -91,20 +90,21 @@ public interface TaskInstanceDao extends IDao<TaskInstance> {
/**
* find last task instance list corresponding to taskCodes in the date interval
*
* @param processInstanceId Task's parent process instance id
* @param taskCodes taskCodes
* @param dateInterval dateInterval
* @param testFlag test flag
* @return task instance list
*/
List<TaskInstance> queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> taskCodes, DateInterval dateInterval,
int testFlag);
List<TaskInstance> queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
Set<Long> taskCodes, int testFlag);

/**
* find last task instance corresponding to taskCode in the date interval
* @param processInstanceId Task's parent process instance id
* @param depTaskCode taskCode
* @param dateInterval dateInterval
* @param testFlag test flag
* @return task instance
*/
TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval, int testFlag);
TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId,
long depTaskCode, int testFlag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -174,16 +173,15 @@ public List<TaskInstance> queryByWorkflowInstanceId(Integer workflowInstanceId)
}

@Override
public List<TaskInstance> queryLastTaskInstanceListIntervalByTaskCodes(Set<Long> taskCodes,
DateInterval dateInterval, int testFlag) {
return mybatisMapper.findLastTaskInstances(taskCodes, dateInterval.getStartTime(), dateInterval.getEndTime(),
testFlag);
public List<TaskInstance> queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId,
Set<Long> taskCodes,
int testFlag) {
return mybatisMapper.findLastTaskInstances(processInstanceId, taskCodes, testFlag);
}

@Override
public TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval,
int testFlag) {
return mybatisMapper.findLastTaskInstance(depTaskCode, dateInterval.getStartTime(), dateInterval.getEndTime(),
testFlag);
public TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, long depTaskCode,
int testFlag) {
return mybatisMapper.findLastTaskInstance(processInstanceId, depTaskCode, testFlag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,27 +350,25 @@
select task_code, max(end_time) as max_end_time
from t_ds_task_instance
where 1=1 and test_flag = #{testFlag}
and instance.process_instance_id = #{processInstanceId}
<if test="taskCodes != null and taskCodes.size() != 0">
and task_code in
<foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="startTime!=null and endTime != null">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
group by task_code
) t_max
on instance.task_code = t_max.task_code and instance.end_time = t_max.max_end_time
on instance.process_instance_id = t_max.process_instance_id
and instance.task_code = t_max.task_code
and instance.end_time = t_max.max_end_time
</select>
<select id="findLastTaskInstance" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where task_code = #{taskCode}
<if test="startTime!=null and endTime != null">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
where process_instance_id = #{processInstanceId}
and task_code = #{taskCode}
order by end_time desc limit 1
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ private DependResult calculateResultForTasks(DependentItem dependentItem,
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_WORKFLOW_CODE) {
result = dependResultByProcessInstance(processInstance);
} else if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByAllTaskOfProcessInstance(processInstance, dateInterval, testFlag);
result = dependResultByAllTaskOfProcessInstance(processInstance, testFlag);
} else {
result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), dateInterval,
testFlag);
result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), testFlag);
}
if (result != DependResult.SUCCESS) {
break;
Expand Down Expand Up @@ -194,8 +193,7 @@ private DependResult dependResultByProcessInstance(ProcessInstance processInstan
*
* @return
*/
private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance,
DateInterval dateInterval, int testFlag) {
private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, int testFlag) {
if (!processInstance.getState().isFinished()) {
log.info("Wait for the dependent workflow to complete, processCode: {}, processInstanceId: {}.",
processInstance.getProcessDefinitionCode(), processInstance.getId());
Expand All @@ -212,8 +210,8 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc
.collect(Collectors.toMap(TaskDefinitionLog::getCode, TaskDefinitionLog::getName));

List<TaskInstance> taskInstanceList =
taskInstanceDao.queryLastTaskInstanceListIntervalByTaskCodes(taskDefinitionCodeMap.keySet(),
dateInterval, testFlag);
taskInstanceDao.queryLastTaskInstanceListIntervalInProcessInstance(processInstance.getId(),
taskDefinitionCodeMap.keySet(), testFlag);
Map<Long, TaskExecutionStatus> taskExecutionStatusMap =
taskInstanceList.stream()
.filter(taskInstance -> taskInstance.getTaskExecuteType() != TaskExecuteType.STREAM)
Expand Down Expand Up @@ -245,14 +243,14 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc
*
* @param processInstance last process instance in the date interval
* @param depTaskCode the dependent task code
* @param dateInterval date interval
* @param testFlag test flag
* @return depend result
*/
private DependResult dependResultBySingleTaskInstance(ProcessInstance processInstance, long depTaskCode,
DateInterval dateInterval, int testFlag) {
int testFlag) {
TaskInstance taskInstance =
taskInstanceDao.queryLastTaskInstanceIntervalByTaskCode(depTaskCode, dateInterval, testFlag);
taskInstanceDao.queryLastTaskInstanceIntervalInProcessInstance(processInstance.getId(),
depTaskCode, testFlag);

if (taskInstance == null) {
TaskDefinition taskDefinition = taskDefinitionDao.queryByCode(depTaskCode);
Expand Down

0 comments on commit 0c470ff

Please sign in to comment.