Skip to content

Commit

Permalink
[Improvement-16872][Master] Select a coordinator from masters to wake…
Browse files Browse the repository at this point in the history
… up task group (#16873)

* Add MasterCoordinator
* Add MasterCoordinatorListener
* Release the task group queue slot when task instance failover
  • Loading branch information
ruanwenjun authored Dec 18, 2024
1 parent 1eedc16 commit 9f67cc4
Show file tree
Hide file tree
Showing 25 changed files with 432 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum TaskGroupQueueStatus {

WAIT_QUEUE(-1, "wait queue"),
ACQUIRE_SUCCESS(1, "acquire success"),
@Deprecated
RELEASE(2, "release");

@EnumValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
@NoArgsConstructor
public class MasterHeartBeat extends BaseHeartBeat implements HeartBeat {

private boolean isCoordinator;

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) {
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setModifyBy(taskDefinition.getModifyBy());
this.setTaskGroupId(taskDefinition.getTaskGroupId());
this.setTaskGroupPriority(taskDefinition.getTaskGroupPriority());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());
this.setTaskExecuteType(taskDefinition.getTaskExecuteType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
import org.apache.dolphinscheduler.server.master.cluster.ClusterStateMonitors;
import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEngine;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
Expand Down Expand Up @@ -95,6 +96,9 @@ public class MasterServer implements IStoppable {
@Autowired
private SystemEventBusFireWorker systemEventBusFireWorker;

@Autowired
private MasterCoordinator masterCoordinator;

public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);

Expand Down Expand Up @@ -122,6 +126,8 @@ public void initialized() {
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);

this.masterCoordinator.start();

this.clusterManager.start();
this.clusterStateMonitors.start();

Expand Down Expand Up @@ -173,6 +179,7 @@ public void close(String cause) {
WorkflowEngine workflowEngine1 = workflowEngine;
SchedulerApi closedSchedulerApi = schedulerApi;
MasterRpcServer closedRpcServer = masterRPCServer;
MasterCoordinator closeMasterCoordinator = masterCoordinator;
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;

/**
* The TaskGroupCoordinator use to manage the task group slot. The task group slot is used to limit the number of {@link TaskInstance} that can be run at the same time.
* <p>
* The {@link TaskGroupQueue} is used to represent the task group slot. When a {@link TaskGroupQueue} which inQueue is YES means the {@link TaskGroupQueue} is using by a {@link TaskInstance}.
* <p>
* When the {@link TaskInstance} need to use task group, we should use @{@link ITaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task group slot,
* this method doesn't block should always acquire successfully, and you should directly stop dispatch the task instance.
* When the task group slot is available, the ITaskGroupCoordinator will wake up the waiting {@link TaskInstance} to dispatch.
* <pre>
* if(needAcquireTaskGroupSlot(taskInstance)) {
* taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
* return;
* }
* </pre>
* <p>
* When the {@link TaskInstance} is finished, we should use @{@link ITaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task group slot.
* <pre>
* if(needToReleaseTaskGroupSlot(taskInstance)) {
* taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
* }
* </pre>
*/
public interface ITaskGroupCoordinator extends AutoCloseable {

/**
* Start the TaskGroupCoordinator, once started, you cannot call this method until you have closed the coordinator.
*/
void start();

/**
* If the {@link TaskInstance#getTaskGroupId()} > 0, and the TaskGroup flag is {@link Flag#YES} then the task instance need to use task group.
*
* @param taskInstance task instance
* @return true if the TaskInstance need to acquireTaskGroupSlot
*/
boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance);

/**
* Acquire the task group slot for the given {@link TaskInstance}.
* <p>
* When taskInstance want to acquire a TaskGroup slot, should call this method. If acquire successfully, will create a TaskGroupQueue in db which is in queue and status is {@link TaskGroupQueueStatus#WAIT_QUEUE}.
* The TaskInstance shouldn't dispatch until there exist available slot, the taskGroupCoordinator notify it.
*
* @param taskInstance the task instance which want to acquire task group slot.
* @throws IllegalArgumentException if the taskInstance is null or the used taskGroup doesn't exist.
*/
void acquireTaskGroupSlot(TaskInstance taskInstance);

/**
* If the TaskInstance is using TaskGroup then it need to release TaskGroupSlot.
*
* @param taskInstance taskInstance
* @return true if the TaskInstance need to release TaskGroupSlot
*/
boolean needToReleaseTaskGroupSlot(TaskInstance taskInstance);

/**
* Release the task group slot for the given {@link TaskInstance}.
* <p>
* When taskInstance want to release a TaskGroup slot, should call this method. The release method will delete the taskGroupQueue.
* This method is idempotent, this means that if the task group slot is already released, this method will do nothing.
*
* @param taskInstance the task instance which want to release task group slot.
* @throws IllegalArgumentException If the taskInstance is null or the task doesn't use task group.
*/
void releaseTaskGroupSlot(TaskInstance taskInstance);

/**
* Close the TaskGroupCoordinator, once closed, the coordinator will not work until you have started the coordinator again.
*/
@Override
void close();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine;

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

/**
* The MasterCoordinator is singleton at the clusters, which is used to do some control work, e.g manage the {@link ITaskGroupCoordinator}
*/
@Slf4j
@Component
public class MasterCoordinator extends AbstractHAServer {

private final ITaskGroupCoordinator taskGroupCoordinator;

public MasterCoordinator(final Registry registry,
final MasterConfig masterConfig,
final ITaskGroupCoordinator taskGroupCoordinator) {
super(
registry,
RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
masterConfig.getMasterAddress());
this.taskGroupCoordinator = taskGroupCoordinator;
addServerStatusChangeListener(new MasterCoordinatorListener(taskGroupCoordinator));
}

@Override
public void start() {
super.start();
log.info("MasterCoordinator started...");
}

@Override
public void close() {
taskGroupCoordinator.close();
log.info("MasterCoordinator shutdown...");
}

public static class MasterCoordinatorListener extends AbstractServerStatusChangeListener {

private final ITaskGroupCoordinator taskGroupCoordinator;

public MasterCoordinatorListener(ITaskGroupCoordinator taskGroupCoordinator) {
this.taskGroupCoordinator = checkNotNull(taskGroupCoordinator);
}

@Override
public void changeToActive() {
taskGroupCoordinator.start();
}

@Override
public void changeToStandBy() {
taskGroupCoordinator.close();
}
}

}
Loading

0 comments on commit 9f67cc4

Please sign in to comment.