Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Sep 20, 2023
1 parent a24e6ab commit 072ce1d
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 73 deletions.
1 change: 0 additions & 1 deletion d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public String getVersion()

enum ResourceType
{
// TODO: add D2_SYMLINK_NODE type
UNKNOWN, D2_NODE, D2_SYMLINK_NODE, D2_NODE_MAP;

static ResourceType fromTypeUrl(String typeUrl)
Expand Down
1 change: 0 additions & 1 deletion d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ private void handleResponse(DiscoveryResponseData response)
_responseReceived = true;
String respNonce = response.getNonce();
ResourceType resourceType = response.getResourceType();
// TODO: handle D2_SYMLINK_NODE type
switch (resourceType)
{
case D2_NODE:
Expand Down
142 changes: 71 additions & 71 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,6 @@ public void setClusterEventBus(PropertyEventBus<ClusterProperties> clusterEventB
_clusterEventBus = clusterEventBus;
}

private boolean isSymlinkNode(String nodeNameOrPath)
{
return nodeNameOrPath != null && nodeNameOrPath.indexOf(SYMLINK_NODE_IDENTIFIER) >= 0;
}

public void registerXdsConnectionListener(XdsConnectionListener listener)
{
_xdsConnectionListeners.add(listener);
Expand All @@ -142,7 +137,7 @@ public void listenToCluster(String clusterName)
{
// if cluster name is a symlink, watch for D2SymlinkNode instead
String resourceName = D2_CLUSTER_NODE_PREFIX + clusterName;
if (!checkAndListenToSymlink(clusterName, resourceName))
if (shouldNotListenToSymlink(clusterName, resourceName))
{
_watchedClusterResources.computeIfAbsent(clusterName, k ->
{
Expand All @@ -158,7 +153,7 @@ public void listenToUris(String clusterName)
{
// if cluster name is a symlink, watch for D2SymlinkNode instead
String resourceName = D2_URI_NODE_PREFIX + clusterName;
if (!checkAndListenToSymlink(clusterName, resourceName))
if (shouldNotListenToSymlink(clusterName, resourceName))
{
_watchedUriResources.computeIfAbsent(clusterName, k ->
{
Expand All @@ -170,7 +165,18 @@ public void listenToUris(String clusterName)
}
}

private boolean checkAndListenToSymlink(String symlinkName, String fullResourceName)
public void listenToService(String serviceName)
{
_watchedServiceResources.computeIfAbsent(serviceName, k ->
{
XdsClient.D2NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName);
_xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, XdsClient.ResourceType.D2_NODE,
watcher);
return watcher;
});
}

private boolean shouldNotListenToSymlink(String symlinkName, String fullResourceName)
{
boolean isSymlink = isSymlinkNode(symlinkName);
if (isSymlink)
Expand All @@ -187,18 +193,12 @@ private boolean checkAndListenToSymlink(String symlinkName, String fullResourceN
});
}

return isSymlink;
return !isSymlink;
}

public void listenToService(String serviceName)
private boolean isSymlinkNode(String nodeNameOrPath)
{
_watchedServiceResources.computeIfAbsent(serviceName, k ->
{
XdsClient.D2NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName);
_xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, XdsClient.ResourceType.D2_NODE,
watcher);
return watcher;
});
return nodeNameOrPath != null && nodeNameOrPath.indexOf(SYMLINK_NODE_IDENTIFIER) >= 0;
}

XdsClient.D2NodeResourceWatcher getServiceResourceWatcher(String serviceName)
Expand Down Expand Up @@ -241,6 +241,60 @@ public void onReconnect()
};
}

XdsClient.D2NodeResourceWatcher getClusterResourceWatcher(String clusterName)
{
return new XdsClient.D2NodeResourceWatcher()
{
@Override
public void onChanged(XdsClient.D2NodeUpdate update)
{
if (_clusterEventBus != null)
{
try
{
ClusterProperties clusterProperties = toClusterProperties(update.getNodeData().getData(),
update.getNodeData().getStat().getMzxid());
// For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks, instead of the actual node, in event bus,
// so we need to publish under the symlink names.
// For other clusters, publish under its original name. Note that these clusters could be either:
// 1) regular clusters requested normally.
// 2) clusters that were pointed by a symlink previously, but no longer the case after the symlink points to other clusters.
// For case #2: the actualResourceToSymlink map will no longer has an entry for this cluster (removed in
// D2SymlinkNodeResourceWatcher::onChanged), thus the updates will be published under the original cluster name
// (like "FooCluster-prod-ltx1"), which has no subscribers anyway, so no harm to publish.
String clusterNameToPublish = _actualNodeToSymlink.getOrDefault(clusterName, clusterName);
_clusterEventBus.publishInitialize(clusterNameToPublish, clusterProperties);
if (_dualReadStateManager != null)
{
_dualReadStateManager.reportData(clusterName, clusterProperties, true);
}
}
catch (InvalidProtocolBufferException | PropertySerializationException e)
{
_log.error("Failed to parse D2 cluster properties from xDS update. Cluster name: " + clusterName, e);
}
}
}

@Override
public void onError(Status error)
{
notifyAvailabilityChanges(false);
}

@Override
public void onReconnect()
{
notifyAvailabilityChanges(true);
}
};
}

XdsClient.D2NodeMapResourceWatcher getUriResourceWatcher(String clusterName)
{
return new UriPropertiesResourceWatcher(clusterName);
}

XdsClient.D2SymlinkNodeResourceWatcher getSymlinkResourceWatcher(String symlinkName)
{
return new XdsClient.D2SymlinkNodeResourceWatcher()
Expand Down Expand Up @@ -306,60 +360,6 @@ private String removeNodePathPrefix(String path, String prefix)
}
}

XdsClient.D2NodeResourceWatcher getClusterResourceWatcher(String clusterName)
{
return new XdsClient.D2NodeResourceWatcher()
{
@Override
public void onChanged(XdsClient.D2NodeUpdate update)
{
if (_clusterEventBus != null)
{
try
{
ClusterProperties clusterProperties = toClusterProperties(update.getNodeData().getData(),
update.getNodeData().getStat().getMzxid());
// For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks, instead of the actual node, in event bus,
// so we need to publish under the symlink names.
// For other clusters, publish under its original name. Note that these clusters could be either:
// 1) regular clusters requested normally.
// 2) clusters that were pointed by a symlink previously, but no longer the case after the symlink points to other clusters.
// For case #2: the actualResourceToSymlink map will no longer has an entry for this cluster (removed in
// D2SymlinkNodeResourceWatcher::onChanged), thus the updates will be published under the original cluster name
// (like "FooCluster-prod-ltx1"), which has no subscribers anyway, so no harm to publish.
String clusterNameToPublish = _actualNodeToSymlink.getOrDefault(clusterName, clusterName);
_clusterEventBus.publishInitialize(clusterNameToPublish, clusterProperties);
if (_dualReadStateManager != null)
{
_dualReadStateManager.reportData(clusterName, clusterProperties, true);
}
}
catch (InvalidProtocolBufferException | PropertySerializationException e)
{
_log.error("Failed to parse D2 cluster properties from xDS update. Cluster name: " + clusterName, e);
}
}
}

@Override
public void onError(Status error)
{
notifyAvailabilityChanges(false);
}

@Override
public void onReconnect()
{
notifyAvailabilityChanges(true);
}
};
}

XdsClient.D2NodeMapResourceWatcher getUriResourceWatcher(String clusterName)
{
return new UriPropertiesResourceWatcher(clusterName);
}

private void notifyAvailabilityChanges(boolean isAvailable)
{
synchronized (_xdsConnectionListeners)
Expand Down
Loading

0 comments on commit 072ce1d

Please sign in to comment.