Skip to content

Commit

Permalink
Merge branch 'master' into evwillia/er-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
evanw555 authored Jul 13, 2023
2 parents 6d05d61 + 30a16ab commit 6abf978
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 48 deletions.
26 changes: 25 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.43.7] - 2023-07-11
- Make file extension of D2 ZKFS file store fully customizable.

## [29.43.6] - 2023-07-10
- Enable passing settings to custom partition accessor

## [29.43.5] - 2023-06-27
- Remove a delegated method in LoadBalancerWithFacilitiesDelegator

## [29.43.4] - 2023-06-22
- Refactor ZookeeperServer, making functionality to generate URI properties for node accessible to subclasses

## [29.43.3] - 2023-06-22
- Bump jfrog build-info-extractor-gradle to 4.32.0

## [29.43.2] - 2023-06-21
- Add missing interface method in LoadBalancerWithFacilitiesDelegator

## [29.43.1] - 2023-06-20
- mute SD update receipt event for initial request on a new cluster

Expand Down Expand Up @@ -5481,7 +5499,13 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.7...master
[29.43.7]: https://github.com/linkedin/rest.li/compare/v29.43.6...v29.43.7
[29.43.6]: https://github.com/linkedin/rest.li/compare/v29.43.5...v29.43.6
[29.43.5]: https://github.com/linkedin/rest.li/compare/v29.43.4...v29.43.5
[29.43.4]: https://github.com/linkedin/rest.li/compare/v29.43.3...v29.43.4
[29.43.3]: https://github.com/linkedin/rest.li/compare/v29.43.2...v29.43.3
[29.43.2]: https://github.com/linkedin/rest.li/compare/v29.43.1...v29.43.2
[29.43.1]: https://github.com/linkedin/rest.li/compare/v29.43.0...v29.43.1
[29.43.0]: https://github.com/linkedin/rest.li/compare/v29.42.4...v29.43.0
[29.42.4]: https://github.com/linkedin/rest.li/compare/v29.42.3...v29.42.4
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ buildscript {
jcenter()
}
dependencies {
classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:4.21.0'
classpath 'org.jfrog.buildinfo:build-info-extractor-gradle:4.32.0'
classpath 'org.jacoco:org.jacoco.core:0.8.7'
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.properties.ClusterProperties;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.util.ClusterInfoProvider;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
Expand All @@ -11,6 +13,8 @@
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import org.apache.commons.lang3.tuple.Pair;


/**
* Abstract class implementing the delegating methods for {@link LoadBalancerWithFacilities}
Expand Down Expand Up @@ -84,4 +88,11 @@ public ServiceProperties getLoadBalancedServiceProperties(String serviceName) th
{
return _loadBalancer.getLoadBalancedServiceProperties(serviceName);
}

@Override
public void getLoadBalancedClusterAndUriProperties(String clusterName,
Callback<Pair<ClusterProperties, UriProperties>> callback)
{
_loadBalancer.getLoadBalancedClusterAndUriProperties(clusterName, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@

package com.linkedin.d2.balancer.servers;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.common.util.None;
Expand All @@ -34,13 +26,18 @@
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.d2.discovery.util.LogUtil.info;
import static com.linkedin.d2.discovery.util.LogUtil.warn;
import static com.linkedin.d2.discovery.util.LogUtil.*;

public class
ZooKeeperServer implements LoadBalancerServer
Expand Down Expand Up @@ -96,19 +93,6 @@ public void markUp(final String clusterName,
@Override
public void onSuccess(None none)
{
Map<URI, Map<Integer, PartitionData>> partitionDesc = new HashMap<>();
partitionDesc.put(uri, partitionDataMap);

Map<URI, Map<String, Object>> myUriSpecificProperties;
if (uriSpecificProperties != null && !uriSpecificProperties.isEmpty())
{
myUriSpecificProperties = new HashMap<>();
myUriSpecificProperties.put(uri, uriSpecificProperties);
}
else
{
myUriSpecificProperties = Collections.emptyMap();
}

if (_log.isInfoEnabled())
{
Expand All @@ -130,7 +114,7 @@ public void onSuccess(None none)
sb.append("}");
info(_log, sb);
}
_store.put(clusterName, new UriProperties(clusterName, partitionDesc, myUriSpecificProperties), callback);
_store.put(clusterName, constructUriPropertiesForNode(clusterName, uri, partitionDataMap, uriSpecificProperties), callback);

}

Expand Down Expand Up @@ -361,17 +345,28 @@ public void shutdown()
{
callback.get(5, TimeUnit.SECONDS);
info(_log, "shutting down complete");
}
catch (TimeoutException e)
{
} catch (TimeoutException e) {
warn(_log, "unable to shut down propertly");
}
catch (InterruptedException | ExecutionException e)
{
} catch (InterruptedException | ExecutionException e) {
warn(_log, "unable to shut down propertly.. got interrupt exception while waiting");
}
}

protected UriProperties constructUriPropertiesForNode(final String clusterName, final URI uri,
final Map<Integer, PartitionData> partitionDataMap, final Map<String, Object> uriSpecificProperties) {
Map<URI, Map<Integer, PartitionData>> partitionDesc = new HashMap<>();
partitionDesc.put(uri, partitionDataMap);

Map<URI, Map<String, Object>> uriToUriSpecificProperties;
if (uriSpecificProperties != null && !uriSpecificProperties.isEmpty()) {
uriToUriSpecificProperties = new HashMap<>();
uriToUriSpecificProperties.put(uri, uriSpecificProperties);
} else {
uriToUriSpecificProperties = Collections.emptyMap();
}
return new UriProperties(clusterName, partitionDesc, uriToUriSpecificProperties);
}

private void storeGet(final String clusterName, final Callback<UriProperties> callback)
{
if (_store == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,51 @@ public class FileSystemDirectory

private final String _d2FsDirPath;
private String _d2ServicePath;
private final String _fsFileExtension;

public FileSystemDirectory(String d2FsDirPath, String d2ServicePath)
{
this(d2FsDirPath, d2ServicePath, FILE_STORE_EXTENSION);
}

public FileSystemDirectory(String d2FsDirPath, String d2ServicePath, String fsFileExtension)
{
_d2FsDirPath = d2FsDirPath;
_d2ServicePath = d2ServicePath;
_fsFileExtension = fsFileExtension;
}

public List<String> getServiceNames()
{
return getFileListWithoutExtension(getServiceDirectory(_d2FsDirPath, _d2ServicePath));
return getFileListWithoutExtension(getServiceDirectory(_d2FsDirPath, _d2ServicePath), _fsFileExtension);
}

public void removeAllServicesWithExcluded(Set<String> excludedServices)
{
List<String> serviceNames = getServiceNames();
serviceNames.removeAll(excludedServices);
removeAllPropertiesFromDirectory(getServiceDirectory(_d2FsDirPath, _d2ServicePath), serviceNames);
removeAllPropertiesFromDirectory(getServiceDirectory(_d2FsDirPath, _d2ServicePath), serviceNames, _fsFileExtension);
}

public void removeAllClustersWithExcluded(Set<String> excludedClusters)
{
List<String> serviceNames = getClusterNames();
serviceNames.removeAll(excludedClusters);
removeAllPropertiesFromDirectory(getServiceDirectory(_d2FsDirPath, _d2ServicePath), serviceNames);
removeAllPropertiesFromDirectory(getServiceDirectory(_d2FsDirPath, _d2ServicePath), serviceNames, _fsFileExtension);
}

public static void removeAllPropertiesFromDirectory(String path, List<String> properties)
{
removeAllPropertiesFromDirectory(path, properties, FILE_STORE_EXTENSION);
}

public static void removeAllPropertiesFromDirectory(String path, List<String> properties, String fileExtension)
{
for (String property : properties)
{
try
{
Files.deleteIfExists(Paths.get(path + File.separator + property + FileSystemDirectory.FILE_STORE_EXTENSION));
Files.deleteIfExists(Paths.get(path + File.separator + property + fileExtension));
} catch (IOException e)
{
LOG.warn("IO Error, continuing deletion", e);
Expand All @@ -86,21 +98,27 @@ public static void removeAllPropertiesFromDirectory(String path, List<String> pr

public List<String> getClusterNames()
{
return getFileListWithoutExtension(getClusterDirectory(_d2ServicePath));
return getFileListWithoutExtension(getClusterDirectory(_d2ServicePath),
_fsFileExtension);
}

public static List<String> getFileListWithoutExtension(String path)
{
return getFileListWithoutExtension(path, FILE_STORE_EXTENSION);
}

public static List<String> getFileListWithoutExtension(String path, String fileExtension)
{
File dir = new File(path);
File[] files = dir.listFiles((dir1, name) -> name.endsWith(FileSystemDirectory.FILE_STORE_EXTENSION));
File[] files = dir.listFiles((dir1, name) -> name.endsWith(fileExtension));
if (files == null)
{
return Collections.emptyList();
}

// cleaning the list from the extension
return Arrays.stream(files)
.map(file -> file.getName().replace(FILE_STORE_EXTENSION, ""))
.map(file -> file.getName().replace(fileExtension, ""))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,24 @@ public class FSBasedDownstreamServicesFetcher implements DownstreamServicesFetch
{
private final String _d2FsPath;
private final String _d2ServicePath;
private final String _fsFileExtension;

public FSBasedDownstreamServicesFetcher(String d2FsPath, String d2ServicePath)
{
this(d2FsPath, d2ServicePath, FileSystemDirectory.FILE_STORE_EXTENSION);
}

public FSBasedDownstreamServicesFetcher(String d2FsPath, String d2ServicePath, String fsFileExtension)
{
_d2FsPath = d2FsPath;
_d2ServicePath = d2ServicePath;
_fsFileExtension = fsFileExtension;
}

@Override
public void getServiceNames(SuccessCallback<List<String>> callback)
{
FileSystemDirectory fsDirectory = new FileSystemDirectory(_d2FsPath, _d2ServicePath);
FileSystemDirectory fsDirectory = new FileSystemDirectory(_d2FsPath, _d2ServicePath, _fsFileExtension);
callback.onSuccess(fsDirectory.getServiceNames());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.linkedin.d2.balancer.util.partitions;

import java.net.URI;
import java.util.Objects;


/**
* BasePartitionAccessor returns partitionId according to the given URI.
Expand All @@ -33,5 +35,13 @@ public interface BasePartitionAccessor
* @throws PartitionAccessException see {@link PartitionAccessException}
*/
int getPartitionId(URI uri) throws PartitionAccessException;

/**
* Given the setting of the partition accessor, check if the setting can be supported
* @return true if supportable
*/
default boolean checkSupportable(String settings) {
return Objects.equals(getClass().getSimpleName(), settings);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,22 @@ private static PartitionAccessor buildCustomizedPartitionAccessor(String cluster
return DefaultPartitionAccessor.getInstance();
}

List<String> requestedClassList = customizedProperties.getPartitionAccessorList();
if (requestedClassList == null || requestedClassList.isEmpty())
List<String> partitionAccessorSettingsList = customizedProperties.getPartitionAccessorList();
if (partitionAccessorSettingsList == null || partitionAccessorSettingsList.isEmpty())
{
// If the no classList is defined, use the first class registered
BasePartitionAccessor partitionAccessor = partitionAccessors.get(0);
_log.info("Use customized partitionAccessor for cluster:" + clusterName + ", class: " + partitionAccessor.getClass().getSimpleName()
+ " (out of " + partitionAccessors.size() + ") registration");
return new CustomizedPartitionAccessor(customizedProperties, partitionAccessor);
}
for (String className : requestedClassList)
for (String setting : partitionAccessorSettingsList)
{
for (BasePartitionAccessor accessor : partitionAccessors)
{
if (className.equals(accessor.getClass().getSimpleName()))
if (accessor.checkSupportable(setting))
{
_log.info("Use matched partitionAccessor for cluster: " + clusterName + ", class: " + accessor.getClass().getSimpleName());
_log.info("Use matched partitionAccessor for cluster: " + clusterName + ", class: " + accessor.getClass().getSimpleName() + ", setting: " + setting);
return new CustomizedPartitionAccessor(customizedProperties, accessor);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public Map<String, T> getAll()
List<String> propertyNames;
try
{
propertyNames = FileSystemDirectory.getFileListWithoutExtension(_fsPath);
propertyNames = FileSystemDirectory.getFileListWithoutExtension(_fsPath, _fsFileExtension);

Map<String, T> result = new HashMap<>();
for (String propertyName : propertyNames)
Expand Down
Loading

0 comments on commit 6abf978

Please sign in to comment.