Skip to content

Commit

Permalink
[Fix-16811] [JdbcRegistry] Clear the jdbc EPHEMERAL data and lock whi…
Browse files Browse the repository at this point in the history
…ch client is not exist (#16837)
  • Loading branch information
ruanwenjun authored Nov 26, 2024
1 parent 9d9517a commit 94e39c6
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void close() {
log.info("Closing Jdbc Registry...");
// remove the current Ephemeral node, if can connect to jdbc
try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
} catch (Exception e) {
log.error("Close Jdbc Registry error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.dolphinscheduler.plugin.registry.jdbc.client;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.Data;

@ToString
@Getter
@Data
@AllArgsConstructor
public class JdbcRegistryClientIdentify {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DO.JdbcRegistryClientHeartbeat;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -36,7 +37,6 @@ public class JdbcRegistryClientHeartbeatDTO {

private Long id;

// clientName
private String clientName;

private Long lastHeartbeatTime;
Expand Down Expand Up @@ -90,7 +90,7 @@ public JdbcRegistryClientHeartbeatDTO clone() {
public static class ClientConfig {

@Builder.Default
private long sessionTimeout = 60 * 1000L;
private long sessionTimeout = TimeUnit.SECONDS.toMillis(60);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.collections4.CollectionUtils;

import java.util.List;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
Expand All @@ -36,6 +37,13 @@ public class JdbcRegistryLockRepository {
@Autowired
private JdbcRegistryLockMapper jdbcRegistryLockMapper;

public List<JdbcRegistryLockDTO> queryAll() {
return jdbcRegistryLockMapper.selectList(null)
.stream()
.map(JdbcRegistryLockDTO::fromJdbcRegistryLock)
.collect(Collectors.toList());
}

public void deleteByClientIds(List<Long> clientIds) {
if (CollectionUtils.isEmpty(clientIds)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public interface IJdbcRegistryDataManager {

boolean existKey(String key);

/**
* Get all the {@link JdbcRegistryDataDTO}.
*/
List<JdbcRegistryDataDTO> getAllJdbcRegistryData();

/**
* Get the {@link JdbcRegistryDataDTO} by key.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public boolean existKey(String key) {
return jdbcRegistryDataRepository.selectByKey(key).isPresent();
}

@Override
public List<JdbcRegistryDataDTO> getAllJdbcRegistryData() {
return jdbcRegistryDataRepository.selectAll();
}

@Override
public Optional<JdbcRegistryDataDTO> getRegistryDataByKey(String key) {
checkNotNull(key);
Expand Down Expand Up @@ -212,7 +217,7 @@ public void deleteJdbcRegistryDataByKey(String key) {
return;
}
jdbcRegistryDataRepository.deleteByKey(key);
JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder()
final JdbcRegistryDataChanceEventDTO registryDataChanceEvent = JdbcRegistryDataChanceEventDTO.builder()
.jdbcRegistryData(jdbcRegistryDataOptional.get())
.eventType(JdbcRegistryDataChanceEventDTO.EventType.DELETE)
.createTime(new Date())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import org.apache.dolphinscheduler.registry.api.RegistryException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -100,25 +102,22 @@ public void start() {
// The server is already started or stopped, will not start again.
return;
}
// Purge the previous client to avoid the client is still in the registry.
purgePreviousJdbcRegistryClient();
// Start the Purge thread
// The Purge thread will remove the client from the registry, and remove it's related data and lock.
// Connect to the database, load the data and lock.
purgeDeadJdbcRegistryClient();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor()
.scheduleWithFixedDelay(this::purgeDeadJdbcRegistryClient,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
// The Purge thread will clear the invalidated data
purgeInvalidJdbcRegistryMetadata();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::purgeInvalidJdbcRegistryMetadata,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
jdbcRegistryDataManager.start();
jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
doTriggerOnConnectedListener();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor()
.scheduleWithFixedDelay(this::refreshClientsHeartbeat,
0,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::refreshClientsHeartbeat,
0,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
}

@SneakyThrows
Expand All @@ -139,9 +138,8 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) {
.lastHeartbeatTime(System.currentTimeMillis())
.build();

while (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) {
log.warn("The client {} is already exist the registry.", jdbcRegistryClientIdentify.getClientId());
Thread.sleep(jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis());
if (jdbcRegistryClientDTOMap.containsKey(jdbcRegistryClientIdentify)) {
throw new IllegalArgumentException("The client is already registered: " + jdbcRegistryClientIdentify);
}
jdbcRegistryClientRepository.insert(registryClientDTO);
jdbcRegistryClients.add(jdbcRegistryClient);
Expand Down Expand Up @@ -260,34 +258,47 @@ public void close() {
jdbcRegistryClientDTOMap.clear();
}

private void purgePreviousJdbcRegistryClient() {
private void purgeInvalidJdbcRegistryMetadata() {
final StopWatch stopWatch = StopWatch.createStarted();
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) {
return;
}
List<Long> previousJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll()
.stream()
.filter(jdbcRegistryClientHeartbeat -> jdbcRegistryClientHeartbeat.getClientName()
.equals(jdbcRegistryProperties.getJdbcRegistryClientName()))
.map(JdbcRegistryClientHeartbeatDTO::getId)
.collect(Collectors.toList());
doPurgeJdbcRegistryClientInDB(previousJdbcRegistryClientIds);

}

private void purgeDeadJdbcRegistryClient() {
if (jdbcRegistryServerState == JdbcRegistryServerState.STOPPED) {
return;
}
List<Long> deadJdbcRegistryClientIds = jdbcRegistryClientRepository.queryAll()
// remove the client which is already dead from the registry, and remove it's related data and lock.
final List<JdbcRegistryClientHeartbeatDTO> jdbcRegistryClients = jdbcRegistryClientRepository.queryAll();
final List<Long> deadJdbcRegistryClientIds = jdbcRegistryClients
.stream()
.filter(JdbcRegistryClientHeartbeatDTO::isDead)
.map(JdbcRegistryClientHeartbeatDTO::getId)
.collect(Collectors.toList());
doPurgeJdbcRegistryClientInDB(deadJdbcRegistryClientIds);

// remove the data and lock which client is not exist.
final Set<Long> existJdbcRegistryClientIds = jdbcRegistryClients
.stream()
.map(JdbcRegistryClientHeartbeatDTO::getId)
.collect(Collectors.toSet());
jdbcRegistryDataManager.getAllJdbcRegistryData()
.stream()
.filter(jdbcRegistryDataDTO -> !existJdbcRegistryClientIds.contains(jdbcRegistryDataDTO.getClientId()))
.filter(jdbcRegistryDataDTO -> DataType.EPHEMERAL.name().equals(jdbcRegistryDataDTO.getDataType()))
.forEach(jdbcRegistryData -> {
log.info("Remove the JdbcRegistryData: {} which client is not exist in the registry",
jdbcRegistryData);
jdbcRegistryDataManager.deleteJdbcRegistryDataByKey(jdbcRegistryData.getDataKey());
});
jdbcRegistryLockRepository.queryAll()
.stream()
.filter(jdbcRegistryLockDTO -> !existJdbcRegistryClientIds.contains(jdbcRegistryLockDTO.getClientId()))
.forEach(jdbcRegistryLockDTO -> {
log.info("Remove the JdbcRegistryLock: {} which client is not exist in the registry",
jdbcRegistryLockDTO);
jdbcRegistryLockRepository.deleteById(jdbcRegistryLockDTO.getId());
});
stopWatch.stop();
log.debug("Success purge invalid jdbcRegistryMetadata, cost: {} ms", stopWatch.getTime());
}

private void doPurgeJdbcRegistryClientInDB(List<Long> jdbcRegistryClientIds) {
private void doPurgeJdbcRegistryClientInDB(final List<Long> jdbcRegistryClientIds) {
if (CollectionUtils.isEmpty(jdbcRegistryClientIds)) {
return;
}
Expand Down

0 comments on commit 94e39c6

Please sign in to comment.