Skip to content

Commit

Permalink
Support d2 symlink node in indis flow (#933)
Browse files Browse the repository at this point in the history
* support d2 symlink node in indis flow

* add unit tests

* address comments

* for symlink clusters, also publish under the origin cluster name
  • Loading branch information
bohhyang authored Oct 2, 2023
1 parent 854dac8 commit c5cff52
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 20 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.46.5] - 2023-10-02
- support d2 symlink in indis flow

## [29.46.4] - 2023-09-27
- Conduct a more thorough search and fix the remaining ByteBuffer errors to be compatible with Java 8 runtimes.

Expand Down Expand Up @@ -5542,7 +5545,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.4...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.5...master
[29.46.5]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.45.2
[29.46.4]: https://github.com/linkedin/rest.li/compare/v29.46.3...v29.46.4
[29.46.3]: https://github.com/linkedin/rest.li/compare/v29.46.2...v29.46.3
[29.46.2]: https://github.com/linkedin/rest.li/compare/v29.46.1...v29.46.2
Expand Down
36 changes: 35 additions & 1 deletion d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public abstract class XdsClient
{
private static final String D2_NODE_TYPE_URL = "type.googleapis.com/indis.D2Node";
private static final String D2_SYMLINK_NODE_TYPE_URL = "type.googleapis.com/indis.D2SymlinkNode";
private static final String D2_NODE_MAP_TYPE_URL = "type.googleapis.com/indis.D2NodeMap";

interface ResourceWatcher
Expand All @@ -44,6 +45,11 @@ interface D2NodeResourceWatcher extends ResourceWatcher
void onChanged(D2NodeUpdate update);
}

interface D2SymlinkNodeResourceWatcher extends ResourceWatcher
{
void onChanged(String resourceName, D2SymlinkNodeUpdate update);
}

interface D2NodeMapResourceWatcher extends ResourceWatcher
{
void onChanged(D2NodeMapUpdate update);
Expand Down Expand Up @@ -76,6 +82,28 @@ public String getVersion()
}
}

static final class D2SymlinkNodeUpdate implements ResourceUpdate
{
String _version;
XdsD2.D2SymlinkNode _nodeData;

D2SymlinkNodeUpdate(String version, XdsD2.D2SymlinkNode nodeData)
{
_version = version;
_nodeData = nodeData;
}

XdsD2.D2SymlinkNode getNodeData()
{
return _nodeData;
}

public String getVersion()
{
return _version;
}
}

static final class D2NodeMapUpdate implements ResourceUpdate
{
String _version;
Expand All @@ -100,14 +128,18 @@ public String getVersion()

enum ResourceType
{
UNKNOWN, D2_NODE, D2_NODE_MAP;
UNKNOWN, D2_NODE, D2_SYMLINK_NODE, D2_NODE_MAP;

static ResourceType fromTypeUrl(String typeUrl)
{
if (typeUrl.equals(D2_NODE_TYPE_URL))
{
return D2_NODE;
}
if (typeUrl.equals(D2_SYMLINK_NODE_TYPE_URL))
{
return D2_SYMLINK_NODE;
}
if (typeUrl.equals(D2_NODE_MAP_TYPE_URL))
{
return D2_NODE_MAP;
Expand All @@ -121,6 +153,8 @@ String typeUrl()
{
case D2_NODE:
return D2_NODE_TYPE_URL;
case D2_SYMLINK_NODE:
return D2_SYMLINK_NODE_TYPE_URL;
case D2_NODE_MAP:
return D2_NODE_MAP_TYPE_URL;
case UNKNOWN:
Expand Down
31 changes: 31 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class XdsClientImpl extends XdsClient
private static final Logger _log = LoggerFactory.getLogger(XdsClientImpl.class);

private final Map<String, ResourceSubscriber> _d2NodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2SymlinkNodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2NodeMapSubscribers = new HashMap<>();

private final Node _node;
Expand Down Expand Up @@ -143,6 +144,28 @@ private void handleD2NodeResponse(DiscoveryResponseData data)
handleResourceUpdate(updates, data.getResourceType(), data.getNonce(), errors);
}

private void handleD2SymlinkNodeResponse(DiscoveryResponseData data)
{
Map<String, D2SymlinkNodeUpdate> updates = new HashMap<>();
List<String> errors = new ArrayList<>();

for (Resource resource: data.getResourcesList())
{
String resourceName = resource.getName();
try
{
XdsD2.D2SymlinkNode symlinkNode = resource.getResource().unpack(XdsD2.D2SymlinkNode.class);
updates.put(resourceName, new D2SymlinkNodeUpdate(resource.getVersion(), symlinkNode));
} catch (InvalidProtocolBufferException e)
{
_log.warn("Failed to unpack D2SymlinkNode response", e);
errors.add("Failed to unpack D2SymlinkNode response");
}
}

handleResourceUpdate(updates, data.getResourceType(), data.getNonce(), errors);
}

private void handleD2NodeMapResponse(DiscoveryResponseData data)
{
Map<String, D2NodeMapUpdate> updates = new HashMap<>();
Expand Down Expand Up @@ -214,6 +237,8 @@ private Map<String, ResourceSubscriber> getResourceSubscriberMap(ResourceType ty
{
case D2_NODE:
return _d2NodeSubscribers;
case D2_SYMLINK_NODE:
return _d2SymlinkNodeSubscribers;
case D2_NODE_MAP:
return _d2NodeMapSubscribers;
case UNKNOWN:
Expand Down Expand Up @@ -257,6 +282,9 @@ private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update)
case D2_NODE:
((D2NodeResourceWatcher) watcher).onChanged((D2NodeUpdate) update);
break;
case D2_SYMLINK_NODE:
((D2SymlinkNodeResourceWatcher) watcher).onChanged(_resource, (D2SymlinkNodeUpdate) update);
break;
case D2_NODE_MAP:
((D2NodeMapResourceWatcher) watcher).onChanged((D2NodeMapUpdate) update);
break;
Expand Down Expand Up @@ -524,6 +552,9 @@ private void handleResponse(DiscoveryResponseData response)
case D2_NODE:
handleD2NodeResponse(response);
break;
case D2_SYMLINK_NODE:
handleD2SymlinkNodeResponse(response);
break;
case D2_NODE_MAP:
handleD2NodeMapResponse(response);
break;
Expand Down
Loading

0 comments on commit c5cff52

Please sign in to comment.