From 94e39c65fb4ac3fa9c8dc18e5b5a8168b04fb1b9 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 26 Nov 2024 21:23:06 +0800 Subject: [PATCH] [Fix-16811] [JdbcRegistry] Clear the jdbc EPHEMERAL data and lock which client is not exist (#16837) --- .../plugin/registry/jdbc/JdbcRegistry.java | 1 + .../client/JdbcRegistryClientIdentify.java | 6 +- .../DTO/JdbcRegistryClientHeartbeatDTO.java | 4 +- .../JdbcRegistryLockRepository.java | 8 ++ .../jdbc/server/IJdbcRegistryDataManager.java | 5 ++ .../jdbc/server/JdbcRegistryDataManager.java | 7 +- .../jdbc/server/JdbcRegistryServer.java | 81 +++++++++++-------- 7 files changed, 70 insertions(+), 42 deletions(-) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index cc646e12e580..11e3f62172eb 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -262,6 +262,7 @@ public void close() { log.info("Closing Jdbc Registry..."); // remove the current Ephemeral node, if can connect to jdbc try (JdbcRegistryClient closed1 = jdbcRegistryClient) { + JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow(); } catch (Exception e) { log.error("Close Jdbc Registry error", e); } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClientIdentify.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClientIdentify.java index b6c8e0986e0d..008b9a3a9d75 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClientIdentify.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClientIdentify.java @@ -18,11 +18,9 @@ package org.apache.dolphinscheduler.plugin.registry.jdbc.client; import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; +import lombok.Data; -@ToString -@Getter +@Data @AllArgsConstructor public class JdbcRegistryClientIdentify { diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DTO/JdbcRegistryClientHeartbeatDTO.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DTO/JdbcRegistryClientHeartbeatDTO.java index 1f006291f577..5caa00eeab67 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DTO/JdbcRegistryClientHeartbeatDTO.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DTO/JdbcRegistryClientHeartbeatDTO.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryClientHeartbeat; import java.util.Date; +import java.util.concurrent.TimeUnit; import lombok.AllArgsConstructor; import lombok.Builder; @@ -36,7 +37,6 @@ public class JdbcRegistryClientHeartbeatDTO { private Long id; - // clientName private String clientName; private Long lastHeartbeatTime; @@ -90,7 +90,7 @@ public JdbcRegistryClientHeartbeatDTO clone() { public static class ClientConfig { @Builder.Default - private long sessionTimeout = 60 * 1000L; + private long sessionTimeout = TimeUnit.SECONDS.toMillis(60); } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/repository/JdbcRegistryLockRepository.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/repository/JdbcRegistryLockRepository.java index ef7e23fbbf98..7133b3e534ce 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/repository/JdbcRegistryLockRepository.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/repository/JdbcRegistryLockRepository.java @@ -26,6 +26,7 @@ import org.apache.commons.collections4.CollectionUtils; import java.util.List; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @@ -36,6 +37,13 @@ public class JdbcRegistryLockRepository { @Autowired private JdbcRegistryLockMapper jdbcRegistryLockMapper; + public List queryAll() { + return jdbcRegistryLockMapper.selectList(null) + .stream() + .map(JdbcRegistryLockDTO::fromJdbcRegistryLock) + .collect(Collectors.toList()); + } + public void deleteByClientIds(List clientIds) { if (CollectionUtils.isEmpty(clientIds)) { return; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/IJdbcRegistryDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/IJdbcRegistryDataManager.java index 1c12f8cd8e09..86db4de6abf7 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/IJdbcRegistryDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/IJdbcRegistryDataManager.java @@ -27,6 +27,11 @@ public interface IJdbcRegistryDataManager { boolean existKey(String key); + /** + * Get all the {@link JdbcRegistryDataDTO}. + */ + List getAllJdbcRegistryData(); + /** * Get the {@link JdbcRegistryDataDTO} by key. */ diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java index 5d44949b3ffd..e0f091bde7d9 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java @@ -135,6 +135,11 @@ public boolean existKey(String key) { return jdbcRegistryDataRepository.selectByKey(key).isPresent(); } + @Override + public List getAllJdbcRegistryData() { + return jdbcRegistryDataRepository.selectAll(); + } + @Override public Optional getRegistryDataByKey(String key) { checkNotNull(key); @@ -212,7 +217,7 @@ public void deleteJdbcRegistryDataByKey(String key) { return; } jdbcRegistryDataRepository.deleteByKey(key); - JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder() + final JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder() .jdbcRegistryData(jdbcRegistryDataOptional.get()) .eventType(JdbcRegistryDataChanceEventDTO.EventType.DELETE) .createTime(new Date()) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java index 0ac985dae32e..e04360bc6f03 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java @@ -33,11 +33,13 @@ import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.time.StopWatch; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -100,25 +102,22 @@ public void start() { // The server is already started or stopped, will not start again. return; } - // Purge the previous client to avoid the client is still in the registry. - purgePreviousJdbcRegistryClient(); // Start the Purge thread - // The Purge thread will remove the client from the registry, and remove it's related data and lock. - // Connect to the database, load the data and lock. - purgeDeadJdbcRegistryClient(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor() - .scheduleWithFixedDelay(this::purgeDeadJdbcRegistryClient, - jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), - jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), - TimeUnit.MILLISECONDS); + // The Purge thread will clear the invalidated data + purgeInvalidJdbcRegistryMetadata(); + JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + this::purgeInvalidJdbcRegistryMetadata, + jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), + jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), + TimeUnit.MILLISECONDS); jdbcRegistryDataManager.start(); jdbcRegistryServerState = JdbcRegistryServerState.STARTED; doTriggerOnConnectedListener(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor() - .scheduleWithFixedDelay(this::refreshClientsHeartbeat, - 0, - jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), - TimeUnit.MILLISECONDS); + JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + this::refreshClientsHeartbeat, + 0, + jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), + TimeUnit.MILLISECONDS); } @SneakyThrows @@ -139,9 +138,8 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) { .lastHeartbeatTime(System.currentTimeMillis()) .build(); - while (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) { - log.warn("The client {} is already exist the registry.", jdbcRegistryClientIdentify.getClientId()); - Thread.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis()); + if (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) { + throw new IllegalArgumentException("The client is already registered: " + jdbcRegistryClientIdentify); } jdbcRegistryClientRepository.insert(registryClientDTO); jdbcRegistryClients.add(jdbcRegistryClient); @@ -260,34 +258,47 @@ public void close() { jdbcRegistryClientDTOMap.clear(); } - private void purgePreviousJdbcRegistryClient() { + private void purgeInvalidJdbcRegistryMetadata() { + final StopWatch stopWatch = StopWatch.createStarted(); if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) { return; } - List previousJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll() - .stream() - .filter(jdbcRegistryClientHeartbeat -> jdbcRegistryClientHeartbeat.getClientName() - .equals(jdbcRegistryProperties.getJdbcRegistryClientName())) - .map(JdbcRegistryClientHeartbeatDTO::getId) - .collect(Collectors.toList()); - doPurgeJdbcRegistryClientInDB(previousJdbcRegistryClientIds); - - } - - private void purgeDeadJdbcRegistryClient() { - if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) { - return; - } - List deadJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll() + // remove the client which is already dead from the registry, and remove it's related data and lock. + final List jdbcRegistryClients = jdbcRegistryClientRepository.queryAll(); + final List deadJdbcRegistryClientIds = jdbcRegistryClients .stream() .filter(JdbcRegistryClientHeartbeatDTO::isDead) .map(JdbcRegistryClientHeartbeatDTO::getId) .collect(Collectors.toList()); doPurgeJdbcRegistryClientInDB(deadJdbcRegistryClientIds); + // remove the data and lock which client is not exist. + final Set existJdbcRegistryClientIds = jdbcRegistryClients + .stream() + .map(JdbcRegistryClientHeartbeatDTO::getId) + .collect(Collectors.toSet()); + jdbcRegistryDataManager.getAllJdbcRegistryData() + .stream() + .filter(jdbcRegistryDataDTO -> !existJdbcRegistryClientIds.contains(jdbcRegistryDataDTO.getClientId())) + .filter(jdbcRegistryDataDTO -> DataType.EPHEMERAL.name().equals(jdbcRegistryDataDTO.getDataType())) + .forEach(jdbcRegistryData -> { + log.info("Remove the JdbcRegistryData: {} which client is not exist in the registry", + jdbcRegistryData); + jdbcRegistryDataManager.deleteJdbcRegistryDataByKey(jdbcRegistryData.getDataKey()); + }); + jdbcRegistryLockRepository.queryAll() + .stream() + .filter(jdbcRegistryLockDTO -> !existJdbcRegistryClientIds.contains(jdbcRegistryLockDTO.getClientId())) + .forEach(jdbcRegistryLockDTO -> { + log.info("Remove the JdbcRegistryLock: {} which client is not exist in the registry", + jdbcRegistryLockDTO); + jdbcRegistryLockRepository.deleteById(jdbcRegistryLockDTO.getId()); + }); + stopWatch.stop(); + log.debug("Success purge invalid jdbcRegistryMetadata, cost: {} ms", stopWatch.getTime()); } - private void doPurgeJdbcRegistryClientInDB(List jdbcRegistryClientIds) { + private void doPurgeJdbcRegistryClientInDB(final List jdbcRegistryClientIds) { if (CollectionUtils.isEmpty(jdbcRegistryClientIds)) { return; }