Skip to content

Commit

Permalink
feature: update workflow task dag
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Apr 11, 2024
1 parent e925b4d commit 7880c15
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
* limitations under the License.
*/

package cn.sliew.scaleph.workflow.scheduler.quartz;

import cn.sliew.scaleph.workflow.engine.action.ActionContext;
package cn.sliew.scaleph.workflow.engine.action;

import java.util.Date;
import java.util.Map;
Expand All @@ -41,6 +39,17 @@ public ActionContextBuilder withWorkflowInstanceId(Long workflowInstanceId) {
return this;
}


public ActionContextBuilder withWorkflowTaskDefinitionId(Long stepId) {
context.setWorkflowTaskDefinitionId(stepId);
return this;
}

public ActionContextBuilder withWorkflowTaskInstanceId(Long workflowTaskInstanceId) {
context.setWorkflowTaskInstanceId(workflowTaskInstanceId);
return this;
}

public ActionContextBuilder withPreviousFireTime(Date previousFireTime) {
context.setPreviousFireTime(previousFireTime);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,38 @@

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.scaleph.common.util.SpringApplicationContextUtil;
import cn.sliew.scaleph.queue.MessageListener;
import cn.sliew.scaleph.workflow.engine.Engine;
import cn.sliew.scaleph.workflow.engine.EngineBuilder;
import cn.sliew.scaleph.workflow.engine.action.Action;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionContextBuilder;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.ParallelFlow;
import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow;
import cn.sliew.scaleph.workflow.service.WorkflowDefinitionService;
import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionDTO2;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import cn.sliew.scaleph.workflow.statemachine.WorkflowTaskInstanceStateMachine;
import org.apache.commons.lang3.RandomUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ClassUtils;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@MessageListener(topic = WorkflowTaskInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP)
public class WorkflowTaskInstanceDeployEventListener extends AbstractWorkflowTaskInstanceEventListener {

public static final String TOPIC = "TOPIC_WORKFLOW_TASK_INSTANCE_COMMAND_DEPLOY";

public static Engine engine = EngineBuilder.newInstance().build();

@Override
protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) {
CompletableFuture<?> future = executorService.submit(new DeployRunner(event)).toCompletableFuture();
Expand All @@ -47,13 +63,16 @@ protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event)
return future;
}

@Slf4j
public static class DeployRunner implements Runnable, Serializable {

private WorkflowTaskInstanceEventDTO event;

@RInject
private String taskId;
@Autowired
private WorkflowDefinitionService workflowDefinitionService;
@Autowired
private WorkflowTaskInstanceService workflowTaskInstanceService;

public DeployRunner(WorkflowTaskInstanceEventDTO event) {
Expand All @@ -62,13 +81,43 @@ public DeployRunner(WorkflowTaskInstanceEventDTO event) {

@Override
public void run() {
workflowTaskInstanceService.updateTaskId(event.getWorkflowTaskInstanceId(), taskId);
workflowTaskInstanceService.updateState(event.getWorkflowTaskInstanceId(), event.getState(), event.getNextState(), null);

WorkflowTaskInstanceDTO workflowTaskInstanceDTO = workflowTaskInstanceService.get(event.getWorkflowTaskInstanceId());
WorkflowTaskDefinitionDTO2 taskDefinition = workflowDefinitionService.getTaskDefinition(workflowTaskInstanceDTO.getStepId());

try {
workflowTaskInstanceService.updateTaskId(event.getWorkflowTaskInstanceId(), taskId);
workflowTaskInstanceService.updateState(event.getWorkflowTaskInstanceId(), event.getState(), event.getNextState(), null);
TimeUnit.SECONDS.sleep(RandomUtils.nextLong(1, 30));
} catch (InterruptedException e) {
throw new RuntimeException(e);
Class<?> clazz = ClassUtils.forName(taskDefinition.getStepMeta().getHandler(), ClassUtils.getDefaultClassLoader());
Action action = (Action) SpringApplicationContextUtil.getBean(clazz);
WorkFlow workFlow = ParallelFlow.newParallelFlow()
.name(taskDefinition.getStepName())
.execute(action)
.build();
ActionContext actionContext = buildActionContext(workflowTaskInstanceDTO);
engine.run(workFlow, actionContext, new ActionListener<ActionResult>() {
@Override
public void onResponse(ActionResult result) {
log.debug("workflow task {} run success!", taskDefinition.getStepName());
}

@Override
public void onFailure(Throwable e) {
log.error("workflow task {} run failure!", taskDefinition.getStepName(), e);
}
});
} catch (ClassNotFoundException e) {
Rethrower.throwAs(e);
}
}

private ActionContext buildActionContext(WorkflowTaskInstanceDTO workflowTaskInstanceDTO) {
return ActionContextBuilder.newBuilder()
.withWorkflowDefinitionId(workflowTaskInstanceDTO.getWorkflowInstanceDTO().getWorkflowDefinition().getId())
.withWorkflowInstanceId(workflowTaskInstanceDTO.getWorkflowInstanceDTO().getId())
.withWorkflowTaskDefinitionId(workflowTaskInstanceDTO.getStepId())
.withWorkflowTaskInstanceId(workflowTaskInstanceDTO.getId())
.validateAndBuild();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,21 @@

package cn.sliew.scaleph.workflow.scheduler.quartz;

import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.util.SpringApplicationContextUtil;
import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowSchedule;
import cn.sliew.scaleph.workflow.engine.Engine;
import cn.sliew.scaleph.workflow.engine.EngineBuilder;
import cn.sliew.scaleph.workflow.engine.action.Action;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.ParallelFlow;
import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow;
import cn.sliew.scaleph.workflow.service.WorkflowDefinitionService;
import cn.sliew.scaleph.workflow.service.WorkflowInstanceService;
import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionDTO2;
import com.google.common.graph.Graph;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.util.ClassUtils;

import java.util.Set;

@Slf4j
public class QuartzJobHandler extends QuartzJobBean {

private Engine engine = EngineBuilder.newInstance().build();

@Autowired
private WorkflowDefinitionService workflowDefinitionService;
@Autowired
Expand All @@ -68,49 +50,6 @@ protected void executeInternal(JobExecutionContext context) throws JobExecutionE
String json = dataMap.getString(QuartzUtil.WORKFLOW_SCHEDULE);
WorkflowSchedule workflowSchedule = JacksonUtil.parseJsonString(json, WorkflowSchedule.class);
WorkflowDefinitionDTO workflowDefinitionDTO = workflowDefinitionService.get(workflowSchedule.getWorkflowDefinitionId());
WorkflowInstanceDTO workflowInstanceDTO = workflowInstanceService.deploy(workflowDefinitionDTO);

// todo 以下全部移除
ActionContext actionContext = buildActionContext(context, workflowDefinitionDTO, workflowInstanceDTO);
Graph<WorkflowTaskDefinitionDTO2> dag = workflowDefinitionService.getDag(workflowDefinitionDTO.getId());
Set<WorkflowTaskDefinitionDTO2> workflowTaskDefinitionDTOS = dag.nodes();
// fixme 应该是对 task 的上下游关系进行梳理后,进而执行
Action[] actions = workflowTaskDefinitionDTOS.stream().map(workflowTaskDefinition -> {
try {
Class<?> clazz = ClassUtils.forName(workflowTaskDefinition.getStepMeta().getHandler(), ClassUtils.getDefaultClassLoader());
return (Action) SpringApplicationContextUtil.getBean(clazz);
} catch (ClassNotFoundException e) {
Rethrower.throwAs(e);
return null;
}
}).toArray(length -> new Action[length]);
WorkFlow workFlow = ParallelFlow.newParallelFlow()
.name(workflowDefinitionDTO.getName())
.execute(actions)
.build();
engine.run(workFlow, actionContext, new ActionListener<ActionResult>() {
@Override
public void onResponse(ActionResult result) {
log.debug("workflow {} run success!", workflowDefinitionDTO.getName());
}

@Override
public void onFailure(Throwable e) {
log.error("workflow {} run failure!", workflowDefinitionDTO.getName(), e);
}
});
workflowInstanceService.deploy(workflowDefinitionDTO);
}

private ActionContext buildActionContext(JobExecutionContext context, WorkflowDefinitionDTO definitionDTO, WorkflowInstanceDTO instanceDTO) {
return ActionContextBuilder.newBuilder()
.withWorkflowDefinitionId(definitionDTO.getId())
.withWorkflowInstanceId(instanceDTO.getId())
.withParams(definitionDTO.getParam())
.withPreviousFireTime(context.getPreviousFireTime())
.withNextFireTime(context.getNextFireTime())
.withScheduledFireTime(context.getScheduledFireTime())
.withFireTime(context.getFireTime())
.validateAndBuild();
}

}

0 comments on commit 7880c15

Please sign in to comment.