Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add warn logs about invalid property versions #958

Merged
merged 6 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.48.8] - 2023-12-19
- add warn logs about invalid property versions

## [29.48.7] - 2023-12-13
- fix publishing uri and cluster properties for symlink clusters

Expand Down Expand Up @@ -5584,7 +5587,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.48.7...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.48.8...master
[29.48.8]: https://github.com/linkedin/rest.li/compare/v29.48.7...v29.48.8
[29.48.7]: https://github.com/linkedin/rest.li/compare/v29.48.6...v29.48.7
[29.48.6]: https://github.com/linkedin/rest.li/compare/v29.48.5...v29.48.6
[29.48.5]: https://github.com/linkedin/rest.li/compare/v29.48.4...v29.48.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class UriPropertiesMerger implements ZooKeeperPropertyMerger<UriProperties>
{
private static final Logger LOG = LoggerFactory.getLogger(UriPropertiesMerger.class);

@Override
public UriProperties merge(String propertyName, Collection<UriProperties> propertiesToMerge)
{
Expand All @@ -48,6 +53,10 @@ public UriProperties merge(String propertyName, Collection<UriProperties> proper
}
}

if (maxVersion == -1)
{
LOG.warn("Merged Uri properties has invalid version -1. It should be > -1.");
}
return new UriProperties(clusterName, partitionData, uriSpecificProperties, maxVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@
*/
public class ZooKeeperEphemeralStore<T> extends ZooKeeperStore<T>
{
private static final Logger _log =
LoggerFactory.getLogger(ZooKeeperEphemeralStore.class);
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperEphemeralStore.class);
private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/(.*)$");
public static final String DEFAULT_PREFIX = "ephemoral";
public static final String PUT_FAILURE_PATH_SUFFIX = "FAILURE";
Expand Down Expand Up @@ -206,7 +205,7 @@ public ZooKeeperEphemeralStore(ZKConnection client,

if (ephemeralNodesFilePath != null && !useNewWatcher)
{
_log.warn("Forcing enabling useNewWatcher with ephemeralNodesFilePath!=null");
LOG.warn("Forcing enabling useNewWatcher with ephemeralNodesFilePath!=null");
useNewWatcher = true;
}

Expand All @@ -228,7 +227,7 @@ public void put(final String prop, final T value, final Callback<None> callback)
{
_putStats.inc();

trace(_log, "put ", prop, ": ", value);
trace(LOG, "put ", prop, ": ", value);

final String path = getPath(prop);
_zkConn.ensurePersistentNodeExists(path, new Callback<None>()
Expand Down Expand Up @@ -284,7 +283,7 @@ public void remove(String prop, Callback<None> callback)
{
_removeStats.inc();

trace(_log, "remove: ", prop);
trace(LOG, "remove: ", prop);

String path = getPath(prop);
_zkConn.removeNodeUnsafeRecursive(path, callback);
Expand All @@ -302,7 +301,7 @@ public void removePartial(final String prop, final T value, final Callback<None>
{
final String path = getPath(prop);

trace(_log, "remove partial ", prop, ": ", value);
trace(LOG, "remove partial ", prop, ": ", value);

final Callback<Map<String, T>> childrenCallback = new Callback<Map<String, T>>()
{
Expand Down Expand Up @@ -352,7 +351,7 @@ public void processResult(int rc, String path, Object ctx, List<String> children
}
else
{
_log.warn("Ignoring request to removePartial with no children: {}", path);
LOG.warn("Ignoring request to removePartial with no children: {}", path);
callback.onSuccess(None.none());
}
break;
Expand Down Expand Up @@ -401,7 +400,7 @@ private void getMergedChildren(String path, List<String> children, ZKStoreWatche
final String propertyName = getPropertyForPath(path);
if (children.size() > 0)
{
_log.debug("getMergedChildren: collecting {}", children);
LOG.debug("getMergedChildren: collecting {}", children);
ChildCollector collector = new ChildCollector(children.size(), new CallbackAdapter<T, Map<String, T>>(callback)
{
@Override
Expand All @@ -417,7 +416,7 @@ protected T convertResponse(Map<String, T> response) throws Exception
}
else
{
_log.debug("getMergedChildren: no children");
LOG.debug("getMergedChildren: no children");
callback.onSuccess(_merger.merge(propertyName, Collections.emptyList()));
}
}
Expand All @@ -430,21 +429,21 @@ private void getChildrenData(String path, Collection<String> children, Callback<
{
if (children.size() > 0)
{
_log.debug("getChildrenData: collecting {}", children);
LOG.debug("getChildrenData: collecting {}", children);
ChildCollector collector = new ChildCollector(children.size(), callback);
children.forEach(child -> _zk.getData(path + "/" + child, null, collector, null));
}
else
{
_log.debug("getChildrenData: no children");
LOG.debug("getChildrenData: no children");
callback.onSuccess(Collections.emptyMap());
}
}

@Override
public void startPublishing(final String prop)
{
trace(_log, "register: ", prop);
trace(LOG, "register: ", prop);

if (_eventBus == null)
{
Expand Down Expand Up @@ -483,7 +482,7 @@ public void startPublishing(final String prop)
@Override
public void stopPublishing(String prop)
{
trace(_log, "unregister: ", prop);
trace(LOG, "unregister: ", prop);

if (_useNewWatcher)
{
Expand Down Expand Up @@ -609,7 +608,7 @@ public void processWatch(final String propertyName, WatchedEvent watchedEvent)
public void processResult(int rc, final String path, Object ctx, List<String> children)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
LOG.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
final boolean init = (Boolean)ctx;
final String property = getPropertyForPath(path);
switch (code)
Expand All @@ -623,23 +622,23 @@ public void onSuccess(T value)
if (init)
{
_eventBus.publishInitialize(property, value);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishAdd(property, value);
_log.debug("{}: published add", path);
LOG.debug("{}: published add", path);
}
}

@Override
public void onError(Throwable e)
{
_log.error("Failed to merge children for path " + path, e);
LOG.error("Failed to merge children for path " + path, e);
if (init)
{
_eventBus.publishInitialize(property, null);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
}
});
Expand All @@ -648,22 +647,22 @@ public void onError(Throwable e)

case NONODE:
// The node whose children we are monitoring is gone; set an exists watch on it
_log.debug("{}: node is not present, calling exists", path);
LOG.debug("{}: node is not present, calling exists", path);
_zk.exists(path, this, this, false);
if (init)
{
_eventBus.publishInitialize(property, null);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishRemove(property);
_log.debug("{}: published remove", path);
LOG.debug("{}: published remove", path);
}
break;

default:
_log.error("getChildren: unexpected error: {}: {}", code, path);
LOG.error("getChildren: unexpected error: {}: {}", code, path);
break;
}
}
Expand All @@ -675,22 +674,22 @@ public void onError(Throwable e)
public void processResult(int rc, String path, Object ctx, Stat stat)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: exists returned {}", path, code);
LOG.debug("{}: exists returned {}", path, code);
switch (code)
{
case OK:
// The node is back, get children and set child watch
_log.debug("{}: calling getChildren", path);
LOG.debug("{}: calling getChildren", path);
_zk.getChildren(path, this, this, false);
break;

case NONODE:
// The node doesn't exist; OK, the watch is set so now we wait.
_log.debug("{}: set exists watch", path);
LOG.debug("{}: set exists watch", path);
break;

default:
_log.error("exists: unexpected error: {}: {}", code, path);
LOG.error("exists: unexpected error: {}: {}", code, path);
break;
}

Expand Down Expand Up @@ -758,7 +757,7 @@ protected void processWatch(String propertyName, WatchedEvent event)
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
LOG.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
final boolean init = (Boolean)ctx;
final String property = getPropertyForPath(path);
switch (code)
Expand All @@ -785,17 +784,17 @@ public void processResult(int rc, String path, Object ctx, List<String> children
}
_isInitialFetchRef.set(true); // set isInitialFetch to true so that when the exists watch is triggered, it's an initial fetch.
_initialFetchStartAtNanosRef.set(System.nanoTime());
_log.debug("{}: node is not present, calling exists", path);
LOG.debug("{}: node is not present, calling exists", path);
_zk.exists(path, this, this, false);
if (init)
{
_eventBus.publishInitialize(property, null);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishRemove(property);
_log.debug("{}: published remove", path);
LOG.debug("{}: published remove", path);
}
if (_fileStore != null)
{
Expand All @@ -804,7 +803,7 @@ public void processResult(int rc, String path, Object ctx, List<String> children
break;

default:
_log.error("getChildren: unexpected error: {}: {}", code, path);
LOG.error("getChildren: unexpected error: {}: {}", code, path);
break;
}
}
Expand All @@ -816,12 +815,12 @@ private Callback<Map<String, T>> getChildrenDataCallback(String path, boolean in
@Override
public void onError(Throwable e)
{
_log.error("Failed to merge children for path " + path, e);
LOG.error("Failed to merge children for path " + path, e);
if (init)
{
_eventBus.publishInitialize(property, null);
}
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}

@Override
Expand All @@ -843,12 +842,12 @@ public void onSuccess(Map<String, T> result)
if (init)
{
_eventBus.publishInitialize(property, mergedProperty);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishAdd(property, mergedProperty);
_log.debug("{}: published add", path);
LOG.debug("{}: published add", path);
}
}
};
Expand Down Expand Up @@ -920,22 +919,22 @@ private Set<String> calculateChildrenDeltaAndUpdateState(List<String> children,
public void processResult(int rc, String path, Object ctx, Stat stat)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: exists returned {}", path, code);
LOG.debug("{}: exists returned {}", path, code);
switch (code)
{
case OK:
// The node is back, get children and set child watch
_log.debug("{}: calling getChildren", path);
LOG.debug("{}: calling getChildren", path);
_zk.getChildren(path, this, this, false);
break;

case NONODE:
// The node doesn't exist; OK, the watch is set so now we wait.
_log.debug("{}: set exists watch", path);
LOG.debug("{}: set exists watch", path);
break;

default:
_log.error("exists: unexpected error: {}: {}", code, path);
LOG.error("exists: unexpected error: {}: {}", code, path);
break;
}
}
Expand All @@ -944,15 +943,15 @@ private void emitSDStatusInitialRequestEvent(String property, boolean succeeded)
{
if (_eventEmitter == null)
{
_log.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
LOG.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
return;
}

// measure request duration and convert to milli-seconds
long initialFetchDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - _initialFetchStartAtNanosRef.get());
if (initialFetchDurationMillis < 0)
{
_log.warn("Failed to log ServiceDiscoveryStatusInitialRequest event, initialFetchStartAt time is greater than current time.");
LOG.warn("Failed to log ServiceDiscoveryStatusInitialRequest event, initialFetchStartAt time is greater than current time.");
return;
}
// emit service discovery status initial request event for success
Expand All @@ -963,7 +962,7 @@ private void emitSDStatusUpdateReceiptEvents(Map<String, T> updates, boolean isM
{
if (_eventEmitter == null)
{
_log.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
LOG.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
return;
}

Expand All @@ -972,7 +971,7 @@ private void emitSDStatusUpdateReceiptEvents(Map<String, T> updates, boolean isM
{
if (!(uriProperty instanceof UriProperties))
{
_log.error("Unknown type of URI data, ignored: " + uriProperty.toString());
LOG.error("Unknown type of URI data, ignored: " + uriProperty.toString());
return;
}
UriProperties properties = (UriProperties) uriProperty;
Expand Down Expand Up @@ -1027,7 +1026,12 @@ public void processResult(int rc, String s, Object o, byte[] bytes, Stat stat)
try
{
String childPath = s.substring(s.lastIndexOf('/') + 1);
T value = _serializer.fromBytes(bytes, stat.getMzxid());
long version = stat.getMzxid();
if (version <= 0)
{
LOG.warn("ZK data from {} has invalid version: {}", s, version);
}
T value = _serializer.fromBytes(bytes, version);
_properties.put(childPath, value);
if (_count == 0)
{
Expand All @@ -1046,7 +1050,7 @@ public void processResult(int rc, String s, Object o, byte[] bytes, Stat stat)
{
_callback.onSuccess(_properties);
}
_log.debug("{} doesn't exist, count={}", s, _count);
LOG.debug("{} doesn't exist, count={}", s, _count);
break;

default:
Expand Down
Loading
Loading