diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertEventFetcher.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertEventFetcher.java index 11a668ae1ded..010e6266b559 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertEventFetcher.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertEventFetcher.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; @Slf4j @Component @@ -40,7 +41,11 @@ public AlertEventFetcher(AlertHAServer alertHAServer, } @Override + @Transactional public List fetchPendingEvent(int eventOffset) { + // We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed + // to the master db. + // Avoid we query from the slave and find the data is not the latest. return alertDao.listPendingAlerts(eventOffset); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java index cb1eb36bcc10..cc41b5744b86 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java @@ -34,8 +34,8 @@ import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; /** * Use to watch the worker group from database and notify the change. @@ -44,16 +44,22 @@ @Component public class WorkerGroupChangeNotifier { - @Autowired - private MasterConfig masterConfig; + private final MasterConfig masterConfig; + + private final TransactionTemplate transactionTemplate; private final WorkerGroupDao workerGroupDao; + private final List listeners = new CopyOnWriteArrayList<>(); private Map workerGroupMap = new HashMap<>(); - public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) { + public WorkerGroupChangeNotifier(final MasterConfig masterConfig, + final WorkerGroupDao workerGroupDao, + final TransactionTemplate transactionTemplate) { + this.masterConfig = masterConfig; this.workerGroupDao = workerGroupDao; + this.transactionTemplate = transactionTemplate; } public void start() { @@ -85,10 +91,15 @@ Map getWorkerGroupMap() { } private MapComparator detectChangedWorkerGroups() { - Map tmpWorkerGroupMap = workerGroupDao.queryAll() - .stream() - .collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup)); - return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap); + // We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed + // to the master db. + // Avoid we query from the slave and find the data is not the latest. + return transactionTemplate.execute(status -> { + Map tmpWorkerGroupMap = workerGroupDao.queryAll() + .stream() + .collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup)); + return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap); + }); } private void triggerListeners(MapComparator mapComparator) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/IdSlotBasedCommandFetcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/IdSlotBasedCommandFetcher.java index f76a0dfa470b..108fd4336f83 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/IdSlotBasedCommandFetcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/IdSlotBasedCommandFetcher.java @@ -28,6 +28,8 @@ import lombok.extern.slf4j.Slf4j; +import org.springframework.transaction.annotation.Transactional; + /** * The command fetcher which is fetch commands by command id and slot. */ @@ -48,7 +50,10 @@ public IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig idS this.commandDao = commandDao; } + // We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed to + // the master db. @Override + @Transactional public List fetchCommands() { long scheduleStartTime = System.currentTimeMillis(); if (!masterSlotManager.checkSlotValid()) { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifierTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifierTest.java index 2f7098439fe5..ec2f9902aa36 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifierTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifierTest.java @@ -22,12 +22,15 @@ import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; import com.google.common.collect.Lists; @@ -36,7 +39,10 @@ class WorkerGroupChangeNotifierTest { @Test void detectWorkerGroupChanges_addedWorkerGroup() { WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class); - WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao); + WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier( + new MasterConfig(), + workerGroupDao, + new MockTransactionTemplate()); WorkerGroup workerGroup1 = WorkerGroup.builder() .name("workerGroup1") @@ -74,7 +80,10 @@ public void onWorkerGroupChange(List workerGroups) { @Test void detectWorkerGroupChanges_deleteWorkerGroup() { WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class); - WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao); + WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier( + new MasterConfig(), + workerGroupDao, + new MockTransactionTemplate()); WorkerGroup workerGroup1 = WorkerGroup.builder() .name("workerGroup1") @@ -115,7 +124,10 @@ public void onWorkerGroupChange(List workerGroups) { @Test void detectWorkerGroupChanges_updateWorkerGroup() { WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class); - WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao); + WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier( + new MasterConfig(), + workerGroupDao, + new MockTransactionTemplate()); WorkerGroup workerGroup1 = WorkerGroup.builder() .name("workerGroup1") @@ -156,4 +168,11 @@ public void onWorkerGroupChange(List workerGroups) { assertThat(workerGroupDeleted.get()).isFalse(); assertThat(workerGroupChangeNotifier.getWorkerGroupMap()).containsEntry("workerGroup1", updatedWorkerGroup1); } + + public static class MockTransactionTemplate extends TransactionTemplate { + + public T execute(final TransactionCallback action) { + return action.doInTransaction(null); + } + } }