Skip to content

Commit

Permalink
Allow PutOperation to use dynamic success target (#1337)
Browse files Browse the repository at this point in the history
In the intermediate state of "move replica", when decommision of old
replicas is initiated(not transit to INACTIVE yet), the PUT requests
should be rejected on old replicas. For frontends, they are seeing both
old and new replicas(assume 3 old and 3 new), the success target should
be 6 - 1 = 5. In the aformentioned scenario, PUT request failed on 3 old
replicas and we are supposed to fail whole PUT operation. However, PUT
request is highly likely to succeed on 3 new replicas and we actually
could consider it success without generating "slip put" (which makes PUT
latency worse)
The new success criterion is:
 success cnt > Math.max(parallelism - disbled cnt - 1, 2)
The 2 here is original PUT success target, which is the lower bound of
PUT operation target.
  • Loading branch information
jsjtzyy authored and cgtz committed Jan 6, 2020
1 parent 443679b commit 9873671
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 44 deletions.
10 changes: 10 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class RouterConfig {
public static final String ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY = "router.get.blob.operation.share.memory";
public static final String ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED =
"router.get.eligible.replicas.by.state.enabled";
public static final String ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET = "router.put.use.dynamic.success.target";

/**
* Number of independent scaling units for the router.
*/
Expand Down Expand Up @@ -453,6 +455,13 @@ public class RouterConfig {
@Default("false")
public final boolean routerGetEligibleReplicasByStateEnabled;

/**
* Whether to use dynamic success target for put operation in router.
*/
@Config(ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET)
@Default("false")
public final boolean routerPutUseDynamicSuccessTarget;

/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
Expand Down Expand Up @@ -552,5 +561,6 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
routerGetBlobOperationShareMemory = verifiableProperties.getBoolean(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY, false);
routerGetEligibleReplicasByStateEnabled =
verifiableProperties.getBoolean(ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED, false);
routerPutUseDynamicSuccessTarget = verifiableProperties.getBoolean(ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public void cleanUp() {
for (ReplicaId replicaId : replicaIds) {
((MockReplicaId) replicaId).cleanup();
}
replicaIds.clear();
replicaAndState.clear();
}

public void onPartitionReadOnly() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,13 @@ void fillChunks() {
}
}

/**
* @return a snapshot of current put chunks
*/
List<PutChunk> getPutChunks() {
return new ArrayList<>(putChunks);
}

/**
* Called whenever the channel has data but no free or building chunk is available to be filled.
*/
Expand Down Expand Up @@ -1064,6 +1071,13 @@ boolean isComplete() {
return state == ChunkState.Complete;
}

/**
* @return operation tracker used by current put chunk
*/
OperationTracker getOperationTrackerInUse() {
return operationTracker;
}

/**
* Prepare this chunk for building, that is, for being filled with data from the channel.
* @param chunkIndex the position in the overall blob that this chunk is going to be in.
Expand Down Expand Up @@ -1279,7 +1293,7 @@ private void cleanupExpiredInFlightRequests(RequestRegistrationCallback<PutOpera
int correlationId = entry.getKey();
ChunkPutRequestInfo info = entry.getValue();
if (time.milliseconds() - info.startTimeMs > routerConfig.routerRequestTimeoutMs) {
onErrorResponse(info.replicaId);
onErrorResponse(info.replicaId, TrackedRequestFinalState.FAILURE);
logger.debug("PutRequest with correlationId {} in flight has expired for replica {} ", correlationId,
info.replicaId.getDataNodeId());
// Do not notify this as a failure to the response handler, as this timeout could simply be due to
Expand Down Expand Up @@ -1383,11 +1397,13 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
routerMetrics.getDataNodeBasedMetrics(chunkPutRequestInfo.replicaId.getDataNodeId()).putRequestLatencyMs.update(
requestLatencyMs);
boolean isSuccessful;
TrackedRequestFinalState putRequestFinalState = null;
if (responseInfo.getError() != null) {
logger.debug("PutRequest with response correlationId {} timed out for replica {} ", correlationId,
chunkPutRequestInfo.replicaId.getDataNodeId());
setChunkException(new RouterException("Operation timed out", RouterErrorCode.OperationTimedOut));
isSuccessful = false;
putRequestFinalState = TrackedRequestFinalState.FAILURE;
} else {
if (putResponse == null) {
logger.debug(
Expand All @@ -1396,6 +1412,7 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
setChunkException(new RouterException("Response deserialization received an unexpected error",
RouterErrorCode.UnexpectedInternalError));
isSuccessful = false;
putRequestFinalState = TrackedRequestFinalState.FAILURE;
} else {
if (putResponse.getCorrelationId() != correlationId) {
// The NetworkClient associates a response with a request based on the fact that only one request is sent
Expand All @@ -1408,6 +1425,7 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
setChunkException(
new RouterException("Unexpected internal error", RouterErrorCode.UnexpectedInternalError));
isSuccessful = false;
putRequestFinalState = TrackedRequestFinalState.FAILURE;
// we do not notify the ResponseHandler responsible for failure detection as this is an unexpected error.
} else {
ServerErrorCode putError = putResponse.getError();
Expand All @@ -1421,6 +1439,9 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
chunkPutRequestInfo.replicaId, putResponse.getError(), putResponse.getCorrelationId(), blobId);
processServerError(putResponse.getError());
isSuccessful = false;
putRequestFinalState =
putError == ServerErrorCode.Temporarily_Disabled ? TrackedRequestFinalState.REQUEST_DISABLED
: TrackedRequestFinalState.FAILURE;
}
}
}
Expand All @@ -1433,17 +1454,19 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
routerMetrics.crossColoSuccessCount.inc();
}
} else {
onErrorResponse(chunkPutRequestInfo.replicaId);
onErrorResponse(chunkPutRequestInfo.replicaId, putRequestFinalState);
}
checkAndMaybeComplete();
}

/**
* Perform the necessary actions when a request to a replica fails.
* @param replicaId the {@link ReplicaId} associated with the failed response.
* @param putRequestFinalState the {@link RouterErrorCode} associated with failed response.
*/
private void onErrorResponse(ReplicaId replicaId) {
operationTracker.onResponse(replicaId, TrackedRequestFinalState.FAILURE);
private void onErrorResponse(ReplicaId replicaId, TrackedRequestFinalState putRequestFinalState) {
// For Put, final state could be TIMED_OUT, REQUEST_DISABLED and FAILURE
operationTracker.onResponse(replicaId, putRequestFinalState);
routerMetrics.routerRequestErrorCount.inc();
routerMetrics.getDataNodeBasedMetrics(replicaId.getDataNodeId()).putRequestErrorCount.inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ class SimpleOperationTracker implements OperationTracker {
protected int inflightCount = 0;
protected int succeededCount = 0;
protected int failedCount = 0;
protected int disabledCount = 0;

// How many NotFound responses from originating dc will terminate the operation.
// It's decided by the success target of each mutation operations, including put, delete, update ttl etc.
protected int originatingDcNotFoundFailureThreshold = 0;
protected int originatingDcNotFoundCount = 0;

private final OpTrackerIterator otIterator;
private final RouterOperation routerOperation;
private final RouterConfig routerConfig;
private Iterator<ReplicaId> replicaIterator;
private static final Logger logger = LoggerFactory.getLogger(SimpleOperationTracker.class);

Expand Down Expand Up @@ -120,6 +123,8 @@ class SimpleOperationTracker implements OperationTracker {
boolean crossColoEnabled = false;
boolean includeNonOriginatingDcReplicas = true;
int numOfReplicasRequired = Integer.MAX_VALUE;
this.routerConfig = routerConfig;
this.routerOperation = routerOperation;
datacenterName = routerConfig.routerDatacenterName;
List<ReplicaId> eligibleReplicas;
switch (routerOperation) {
Expand Down Expand Up @@ -238,9 +243,29 @@ class SimpleOperationTracker implements OperationTracker {
routerOperation, successTarget, parallelism, originatingDcNotFoundFailureThreshold);
}

/**
* The dynamic success target is introduced mainly for following use case:
* In the intermediate state of "move replica", when decommission of old replicas is initiated(but hasn't transited to
* INACTIVE yet), the PUT requests should be rejected on old replicas. For frontends, they are seeing both old and new
* replicas(lets say 3 old and 3 new) and the success target should be 6 - 1 = 5. In the aforementioned scenario, PUT
* request failed on 3 old replicas. It seems we should fail whole PUT operation because number of remaining requests
* is already less than success target.
* From another point of view, however, PUT request is highly likely to succeed on 3 new replicas and we actually
* could consider it success without generating "slip put" (which makes PUT latency worse). The reason is, if new PUTs
* already succeeded on at least 2 new replicas, read-after-write should always succeed because frontends are always
* able to see new replicas and subsequent READ/DELETE/TtlUpdate request should succeed on at least 2 aforementioned
* new replicas.
*/
@Override
public boolean hasSucceeded() {
return succeededCount >= successTarget;
boolean hasSucceeded;
if (routerOperation == RouterOperation.PutOperation && routerConfig.routerPutUseDynamicSuccessTarget) {
hasSucceeded =
succeededCount >= Math.max(totalReplicaCount - disabledCount - 1, routerConfig.routerPutSuccessTarget);
} else {
hasSucceeded = succeededCount >= successTarget;
}
return hasSucceeded;
}

@Override
Expand All @@ -257,17 +282,28 @@ public boolean isDone() {
@Override
public void onResponse(ReplicaId replicaId, TrackedRequestFinalState trackedRequestFinalState) {
inflightCount--;
if (trackedRequestFinalState == TrackedRequestFinalState.SUCCESS) {
succeededCount++;
} else {
failedCount++;
// NOT_FOUND is a special error. When tracker sees >= numReplicasInOriginatingDc - 1 "NOT_FOUND" from the
// originating DC, we can be sure the operation will end up with a NOT_FOUND error.
if (trackedRequestFinalState == TrackedRequestFinalState.NOT_FOUND && replicaId.getDataNodeId()
.getDatacenterName()
.equals(originatingDcName)) {
originatingDcNotFoundCount++;
}
switch (trackedRequestFinalState) {
case SUCCESS:
succeededCount++;
break;
// Request disabled may happen when PUT/DELETE/TTLUpdate requests attempt to perform on replicas that are being
// decommissioned (i.e STANDBY -> INACTIVE). This is because decommission may take some time and frontends still
// hold old view. Aforementioned requests are rejected by server with Temporarily_Disabled error. For DELETE/TTLUpdate,
// even though we may receive such errors, the success target is still same(=2). For PUT, we have to adjust the
// success target (quorum) to let some PUT operations (with at least 2 requests succeeded on new replicas) succeed.
// Currently, disabledCount only applies to PUT operation.
case REQUEST_DISABLED:
disabledCount++;
break;
default:
failedCount++;
// NOT_FOUND is a special error. When tracker sees >= numReplicasInOriginatingDc - 1 "NOT_FOUND" from the
// originating DC, we can be sure the operation will end up with a NOT_FOUND error.
if (trackedRequestFinalState == TrackedRequestFinalState.NOT_FOUND && replicaId.getDataNodeId()
.getDatacenterName()
.equals(originatingDcName)) {
originatingDcNotFoundCount++;
}
}
}

Expand Down Expand Up @@ -313,7 +349,14 @@ private List<ReplicaId> getEligibleReplicas(PartitionId partitionId, String dcNa
}

private boolean hasFailed() {
return (totalReplicaCount - failedCount) < successTarget || hasFailedOnNotFound();
boolean hasFailed;
if (routerOperation == RouterOperation.PutOperation && routerConfig.routerPutUseDynamicSuccessTarget) {
hasFailed = totalReplicaCount - failedCount < Math.max(totalReplicaCount - 1,
routerConfig.routerPutSuccessTarget + disabledCount);
} else {
hasFailed = (totalReplicaCount - failedCount) < successTarget || hasFailedOnNotFound();
}
return hasFailed;
}

/**
Expand All @@ -323,6 +366,20 @@ public int getSuccessTarget() {
return successTarget;
}

/**
* @return the number of requests that are temporarily disabled on certain replicas.
*/
int getDisabledCount() {
return disabledCount;
}

/**
* @return current failed count in this tracker
*/
int getFailedCount() {
return failedCount;
}

/**
* Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(RouterConfig, RouterOperation, PartitionId, String, boolean)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* consumed by operation tracker to change success/failure counter and determine whether to update Histograms.
*/
public enum TrackedRequestFinalState {
SUCCESS, FAILURE, TIMED_OUT, NOT_FOUND;
SUCCESS, FAILURE, TIMED_OUT, NOT_FOUND, REQUEST_DISABLED;

/**
* Return the corresponding {@link TrackedRequestFinalState} for the given {@link RouterErrorCode}.
Expand Down
Loading

0 comments on commit 9873671

Please sign in to comment.