Skip to content

Commit

Permalink
Move id converter to router.deleteBlob
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie Guo committed Oct 2, 2024
1 parent ad1cc58 commit ab0a10d
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Future<String> stitchBlob(BlobProperties blobProperties, byte[] userMetad
}

@Override
public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> callback,
public Future<Void> deleteBlob(RestRequest restRequest, String blobId, String serviceId, Callback<Void> callback,
QuotaChargeCallback quotaChargeCallback) {
lock.lock();
try {
Expand Down
2 changes: 2 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ public static final class InternalKeys {
* content-length header
*/
public static final String CONTENT_RANGE_LENGTH = KEY_PREFIX + "content-range-length";

public static final String BLOB_ID = KEY_PREFIX + "blob-id";
}

/**
Expand Down
12 changes: 7 additions & 5 deletions ambry-api/src/main/java/com/github/ambry/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ Future<String> stitchBlob(BlobProperties blobProperties, byte[] userMetadata, Li

/**
* Requests for a blob to be deleted asynchronously and invokes the {@link Callback} when the request completes.
* @param blobId The ID of the blob that needs to be deleted.
* @param serviceId The service ID of the service deleting the blob. This can be null if unknown.
* @param callback The {@link Callback} which will be invoked on the completion of a request.
*
* @param restRequest The {@link RestRequest} to delete the blob.
* @param blobId The ID of the blob that needs to be deleted.
* @param serviceId The service ID of the service deleting the blob. This can be null if unknown.
* @param callback The {@link Callback} which will be invoked on the completion of a request.
* @param quotaChargeCallback Listener interface to charge quota cost for the operation.
* @return A future that would contain information about whether the deletion succeeded or not, eventually.
*/
Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> callback,
Future<Void> deleteBlob(RestRequest restRequest, String blobId, String serviceId, Callback<Void> callback,
QuotaChargeCallback quotaChargeCallback);

/**
Expand Down Expand Up @@ -212,7 +214,7 @@ default CompletableFuture<String> putBlob(BlobProperties blobProperties, byte[]
*/
default CompletableFuture<Void> deleteBlob(String blobId, String serviceId) {
CompletableFuture<Void> future = new CompletableFuture<>();
deleteBlob(blobId, serviceId, CallbackUtils.fromCompletableFuture(future), null);
deleteBlob(null, blobId, serviceId, CallbackUtils.fromCompletableFuture(future), null);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void handlePut(RestRequest restRequest, RestResponseChannel restResponseC
public void handleDelete(RestRequest restRequest, RestResponseChannel restResponseChannel) {
if (shouldProceed(restRequest, restResponseChannel)) {
String blobId = getBlobId(restRequest);
router.deleteBlob(blobId, null, new MockDeleteCallback(this, restRequest, restResponseChannel), null);
router.deleteBlob(null, blobId, null, new MockDeleteCallback(this, restRequest, restResponseChannel), null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,40 +116,26 @@ private void start() {
private Callback<Void> securityProcessRequestCallback() {
return buildCallback(metrics.deleteBlobSecurityProcessRequestMetrics, result -> {
String blobIdStr = getRequestPath(restRequest).getOperationOrBlobId(false);
idConverter.convert(restRequest, blobIdStr, idConverterCallback());
LOGGER.debug("Blob Id to convert: " + blobIdStr);
}, restRequest.getUri(), LOGGER, finalCallback);
}

/**
* After {@link IdConverter#convert} finishes, call {@link SecurityService#postProcessRequest} to perform
* request time security checks that rely on the request being fully parsed and any additional arguments set.
* @return a {@link Callback} to be used with {@link IdConverter#convert}.
*/
private Callback<String> idConverterCallback() {
return buildCallback(metrics.deleteBlobIdConversionMetrics, convertedBlobId -> {
BlobId blobId = FrontendUtils.getBlobIdFromString(convertedBlobId, clusterMap);
if (restRequest.getArgs().get(InternalKeys.TARGET_ACCOUNT_KEY) == null) {
// Inject account and container when they are missing from the rest request.
accountAndContainerInjector.injectTargetAccountAndContainerFromBlobId(blobId, restRequest,
metrics.deleteBlobMetricsGroup);
if (!RequestPath.matchesOperation(blobIdStr, Operations.NAMED_BLOB)) {
blobIdStr = blobIdStr.startsWith("/") ? blobIdStr.substring(1) : blobIdStr;
BlobId convertedBlobId = FrontendUtils.getBlobIdFromString(blobIdStr, clusterMap);
restRequest.setArg(RestUtils.InternalKeys.BLOB_ID, convertedBlobId);
accountAndContainerInjector.injectTargetAccountAndContainerFromBlobId(convertedBlobId, restRequest,
metrics.updateBlobTtlMetricsGroup);
}
LOGGER.debug("Converted Blob Id: " + blobId);
securityService.postProcessRequest(restRequest, securityPostProcessRequestCallback(blobId));
securityService.postProcessRequest(restRequest, securityPostProcessRequestCallback());
}, restRequest.getUri(), LOGGER, finalCallback);
}

/**
* After {@link SecurityService#postProcessRequest} finishes, call {@link Router#deleteBlob} to delete
* the blob in the storage layer.
* @param blobId the {@link BlobId} to undelete
* @return a {@link Callback} to be used with {@link SecurityService#postProcessRequest}.
*/
private Callback<Void> securityPostProcessRequestCallback(BlobId blobId) {
private Callback<Void> securityPostProcessRequestCallback() {
return buildCallback(metrics.deleteBlobSecurityPostProcessRequestMetrics, result -> {
String serviceId = RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SERVICE_ID, false);
LOGGER.debug("Start deleting the blob for blobId " + blobId);
router.deleteBlob(blobId.getID(), serviceId, routerCallback(),
router.deleteBlob(restRequest, null, serviceId, routerCallback(),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false));
}, restRequest.getUri(), LOGGER, finalCallback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3512,7 +3512,8 @@ private void doIdConverterExceptionTest(FrontendTestIdConverterFactory converter
accountAndContainerInjector, datacenterName, hostname, clusterName, accountStatsStore, QUOTA_MANAGER);
frontendRestRequestService.setupResponseHandler(responseHandler);
frontendRestRequestService.start();
RestMethod[] restMethods = {RestMethod.POST, RestMethod.GET, RestMethod.DELETE, RestMethod.HEAD};
//Regular blob delete won't need to go through id converter
RestMethod[] restMethods = {RestMethod.POST, RestMethod.GET, RestMethod.HEAD};
doExternalServicesBadInputTest(restMethods, expectedExceptionMsg, false);
}

Expand Down Expand Up @@ -4444,7 +4445,7 @@ public Future<String> stitchBlob(BlobProperties blobProperties, byte[] userMetad
}

@Override
public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> callback,
public Future<Void> deleteBlob(RestRequest restRequest, String blobId, String serviceId, Callback<Void> callback,
QuotaChargeCallback quotaChargeCallback) {
deleteServiceId = serviceId;
return completeOperation(null, callback, OpType.DeleteBlob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setupBlob() throws Exception {
blobId = router.putBlob(BLOB_PROPERTIES, new byte[0], channel, new PutBlobOptionsBuilder().build())
.get(1, TimeUnit.SECONDS);
idConverterFactory.translation = blobId;
router.deleteBlob(blobId, SERVICE_ID, null, QuotaTestUtils.createTestQuotaChargeCallback(QuotaMethod.WRITE))
router.deleteBlob(null, blobId, SERVICE_ID, null, QuotaTestUtils.createTestQuotaChargeCallback(QuotaMethod.WRITE))
.get(1, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.github.ambry.repair.RepairRequestsDb;
import com.github.ambry.repair.RepairRequestsDbFactory;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.store.StoreKey;
import com.github.ambry.utils.Time;
import com.github.ambry.utils.Utils;
Expand All @@ -52,6 +53,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.rest.RestUtils.*;


/**
* Streaming, non-blocking router implementation for Ambry.
Expand Down Expand Up @@ -471,21 +474,73 @@ public Future<String> stitchBlob(BlobProperties blobProperties, byte[] userMetad

/**
* Requests for a blob to be deleted asynchronously and invokes the {@link Callback} when the request completes.
* @param blobId The ID of the blob that needs to be deleted.
* @param serviceId The service ID of the service deleting the blob. This can be null if unknown.
* @param callback The {@link Callback} which will be invoked on the completion of a request.
*
* @param restRequest The {@link RestRequest} to delete the blob.
* @param blobId The ID of the blob that needs to be deleted.
* @param serviceId The service ID of the service deleting the blob. This can be null if unknown.
* @param callback The {@link Callback} which will be invoked on the completion of a request.
* @return A future that would contain information about whether the deletion succeeded or not, eventually.
*/
@Override
public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> callback,
public Future<Void> deleteBlob(RestRequest restRequest, String blobId, String serviceId, Callback<Void> callback,
QuotaChargeCallback quotaChargeCallback) {
if (blobId == null) {
throw new IllegalArgumentException("blobId must not be null");
FutureResult<Void> futureResult = new FutureResult<>();
if (restRequest == null) {
if (blobId == null) {
throw new IllegalArgumentException("blobId must not be null");
}
proceedWithDelete(blobId, serviceId, callback, futureResult, quotaChargeCallback);
} else {
//if the blobId is not named blob based, it could bypass the first round of d converter logic by checking InternalKeys.BLOB_ID.
//First round is to convert named blob to blob Id.
if (restRequest.getArgs().get(RestUtils.InternalKeys.BLOB_ID) != null) {
String blobIdStr =
removeLeadingSlashIfNeeded(restRequest.getArgs().get(RestUtils.InternalKeys.BLOB_ID).toString());
proceedWithDelete(blobIdStr, serviceId, callback, futureResult, quotaChargeCallback);
} else {
try {
//If the blobId is named blob, need to go through convert first.
String blobIdStr = getRequestPath(restRequest).getOperationOrBlobId(true);

// Convert asynchronously and proceed once blobId is available
idConverter.convert(restRequest, blobIdStr, null, new Callback<String>() {
@Override
public void onCompletion(String convertedBlobId, Exception exception) {
if (exception != null) {
callback.onCompletion(null, exception);
} else {
// Call proceedWithTtlUpdate once blobId is available
proceedWithDelete(convertedBlobId, serviceId, callback, futureResult, quotaChargeCallback);
}
}
});
return futureResult; // Return early since we're waiting for the async operation
} catch (Exception e) {
callback.onCompletion(null, e);
return futureResult;
}
}
}
return futureResult;
}

private String removeLeadingSlashIfNeeded(String blobId) {
return blobId.startsWith("/") ? blobId.substring(1) : blobId;
}

/**
* Helper method to perform delete once the blob Id is available.
*/
private void proceedWithDelete(String blobId, String serviceId, Callback<Void> callback,
FutureResult<Void> futureResult, QuotaChargeCallback quotaChargeCallback) {
currentOperationsCount.incrementAndGet();
routerMetrics.deleteBlobOperationRate.mark();
routerMetrics.operationQueuingRate.mark();
FutureResult<Void> futureResult = new FutureResult<>();

if (blobId == null) {
throw new IllegalArgumentException("blobId must not be null");
}

if (isOpen.get()) {
if (notFoundCache.getIfPresent(blobId) != null) {
// If we know that blob doesn't exist, complete the operation
Expand All @@ -509,7 +564,6 @@ public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> c
routerMetrics.onDeleteBlobError(routerException);
completeOperation(futureResult, callback, null, routerException);
}
return futureResult;
}

/**
Expand Down Expand Up @@ -579,7 +633,8 @@ public Future<Void> updateBlobTtl(RestRequest restRequest, String blobId, String
callback.onCompletion(null, exception);
};
Callback<Void> wrappedCallback =
restRequest != null ? createIdConverterCallbackForTtlUpdate(restRequest, blobId, futureResult, stringCallback) : callback;
restRequest != null ? createIdConverterCallbackForTtlUpdateAndDelete(restRequest, blobId, futureResult,
stringCallback) : callback;
if (isOpen.get()) {
if (notFoundCache.getIfPresent(blobId) != null) {
// If we know that blob doesn't exist, complete the operation.
Expand Down Expand Up @@ -853,7 +908,7 @@ private Callback<String> createIdConverterCallbackForPut(RestRequest restRequest
* @param blobId the blobId to update ttl.
* @return
*/
private Callback<Void> createIdConverterCallbackForTtlUpdate(RestRequest restRequest, String blobId,
private Callback<Void> createIdConverterCallbackForTtlUpdateAndDelete(RestRequest restRequest, String blobId,
FutureResult<Void> futureResult, Callback<String> callback) {
return (result, exception) -> {
if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void testAndAssert(RouterErrorCode expectedError) throws Exception {
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
if (i == 1) {
futures.add(router.deleteBlob(blobIdString, null, new Callback<Void>() {
futures.add(router.deleteBlob(null, blobIdString, null, new Callback<Void>() {
@Override
public void onCompletion(Void result, Exception exception) {
callbackCalled.countDown();
Expand Down Expand Up @@ -675,7 +675,7 @@ RouterErrorCode.OperationTimedOut, new ErrorCodeChecker() {
public void testAndAssert(RouterErrorCode expectedError) throws Exception {
CountDownLatch operationCompleteLatch = new CountDownLatch(1);
future =
router.deleteBlob(blobIdString, null, new ClientCallback(operationCompleteLatch), quotaChargeCallback);
router.deleteBlob(null, blobIdString, null, new ClientCallback(operationCompleteLatch), quotaChargeCallback);
do {
// increment mock time
mockTime.sleep(1000);
Expand Down Expand Up @@ -707,7 +707,7 @@ public void testSelectorError() throws Exception {
mockSelectorState.set(state);
setServerErrorCodes(serverErrorCodes, partition, serverLayout, localDc);
CountDownLatch operationCompleteLatch = new CountDownLatch(1);
future = router.deleteBlob(blobIdString, null, new ClientCallback(operationCompleteLatch), quotaChargeCallback);
future = router.deleteBlob(null, blobIdString, null, new ClientCallback(operationCompleteLatch), quotaChargeCallback);
do {
// increment mock time
mockTime.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public QuotaConfig getQuotaConfig() {
router.getBlob(blobId, new GetBlobOptionsBuilder().operationType(GetBlobOptions.OperationType.BlobInfo).build(),
null, quotaChargeCallback).get();
assertEquals(expectedChargeCallbackCount += quotaAccountingSize, listenerCalledCount.get());
router.deleteBlob(blobId, null, null, quotaChargeCallback).get();
router.deleteBlob(null, blobId, null, null, quotaChargeCallback).get();
assertEquals(expectedChargeCallbackCount += quotaAccountingSize, listenerCalledCount.get());
try {
router.getBlob(blobId, new GetBlobOptionsBuilder().build(), null, quotaChargeCallback).get();
Expand Down Expand Up @@ -273,7 +273,7 @@ public QuotaConfig getQuotaConfig() {
assertEquals(0, routerMetrics.updateOptimizedCount.getCount());
assertEquals(1, routerMetrics.updateUnOptimizedCount.getCount());

router.deleteBlob(stitchedBlobId, null, null, quotaChargeCallback).get();
router.deleteBlob(null, stitchedBlobId, null, null, quotaChargeCallback).get();
assertEquals(expectedChargeCallbackCount + quotaAccountingSize, listenerCalledCount.get());
} finally {
router.close();
Expand Down Expand Up @@ -363,7 +363,7 @@ public QuotaConfig getQuotaConfig() {
assertEquals(1, routerMetrics.updateOptimizedCount.getCount());
assertEquals(0, routerMetrics.updateUnOptimizedCount.getCount());

router.deleteBlob(compositeBlobId, null, null, quotaChargeCallback).get();
router.deleteBlob(null, compositeBlobId, null, null, quotaChargeCallback).get();
assertEquals(expectedChargeCallbackCount + quotaAccountingSize, listenerCalledCount.get());
} finally {
router.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2380,7 +2380,7 @@ void routerMetadataCacheErrorOnDeleteCompositeBlob(ServerErrorCode serverErrorCo
// failed delete
mockServerLayout.getMockServers().forEach(mockServer -> mockServer.setServerErrorForAllRequests(serverErrorCode));
TestCallback<Void> testCallback = new TestCallback<>();
Future<Void> future = router.deleteBlob(blobId, null, testCallback, null);
Future<Void> future = router.deleteBlob(null, blobId, null, testCallback, null);
assertFailureAndCheckErrorCode(future, testCallback, routerErrorCode);
assertNotNull("Blob metadata must be present in metadata cache", router.getBlobMetadataCache().getObject(blobId));
mockServerLayout.getMockServers().forEach(mockServer -> mockServer.resetServerErrors());
Expand Down Expand Up @@ -2455,7 +2455,7 @@ public void testBlobNotFoundCache() throws Exception {
assertFailureAndCheckErrorCode(ttlUpdateFuture, ttlUpdateTestCallback, RouterErrorCode.BlobDoesNotExist);

TestCallback<Void> deleteTestCallback = new TestCallback<>();
Future<Void> deleteFuture = router.deleteBlob(blobId, updateServiceId, deleteTestCallback, null);
Future<Void> deleteFuture = router.deleteBlob(null, blobId, updateServiceId, deleteTestCallback, null);
assertFailureAndCheckErrorCode(deleteFuture, deleteTestCallback, RouterErrorCode.BlobDoesNotExist);

TestCallback<Void> unDeleteTestCallback = new TestCallback<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ void check() throws Exception {
opChain.testFutures.add(testFuture);
return;
}
Future<Void> future = router.deleteBlob(fraudId.getID(), null, callback, quotaChargeCallback);
Future<Void> future = router.deleteBlob(null, fraudId.getID(), null, callback, quotaChargeCallback);
TestFuture<Void> testFuture = new TestFuture<Void>(future, genLabel("deleteBlobAuthorizationFail", true), opChain) {
@Override
void check() throws Exception {
Expand Down Expand Up @@ -463,7 +463,7 @@ void check() throws Exception {
*/
private void startDeleteBlob(final OperationChain opChain) {
Callback<Void> callback = new TestCallback<>(opChain, false);
Future<Void> future = router.deleteBlob(opChain.blobId, null, callback, quotaChargeCallback);
Future<Void> future = router.deleteBlob(null, opChain.blobId, null, callback, quotaChargeCallback);
TestFuture<Void> testFuture = new TestFuture<Void>(future, genLabel("deleteBlob", false), opChain) {
@Override
void check() throws Exception {
Expand Down
Loading

0 comments on commit ab0a10d

Please sign in to comment.