-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reconstruct blobId by excluding partition Id #2910
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,15 @@ | |
package com.github.ambry.frontend; | ||
|
||
import com.github.ambry.account.Container; | ||
import com.github.ambry.clustermap.ClusterMap; | ||
import com.github.ambry.commons.BlobId; | ||
import com.github.ambry.commons.Callback; | ||
import com.github.ambry.commons.RetainingAsyncWritableChannel; | ||
import com.github.ambry.config.FrontendConfig; | ||
import com.github.ambry.config.RouterConfig; | ||
import com.github.ambry.messageformat.BlobInfo; | ||
import com.github.ambry.messageformat.BlobProperties; | ||
import com.github.ambry.notification.NotificationBlobType; | ||
import com.github.ambry.quota.QuotaManager; | ||
import com.github.ambry.quota.QuotaUtils; | ||
import com.github.ambry.rest.RequestPath; | ||
|
@@ -49,6 +52,7 @@ | |
import static com.github.ambry.frontend.FrontendUtils.*; | ||
import static com.github.ambry.rest.RestUtils.*; | ||
import static com.github.ambry.rest.RestUtils.Headers.*; | ||
import static com.github.ambry.rest.RestUtils.InternalKeys.*; | ||
|
||
|
||
/** | ||
|
@@ -81,23 +85,27 @@ class PostBlobHandler { | |
private final FrontendMetrics frontendMetrics; | ||
private final String clusterName; | ||
private final QuotaManager quotaManager; | ||
private final RouterConfig routerConfig; | ||
private final ClusterMap clusterMap; | ||
|
||
/** | ||
* Constructs a handler for handling requests for uploading or stitching blobs. | ||
* @param securityService the {@link SecurityService} to use. | ||
* @param idConverter the {@link IdConverter} to use. | ||
* @param idSigningService the {@link IdSigningService} to use. | ||
* @param router the {@link Router} to use. | ||
* | ||
* @param securityService the {@link SecurityService} to use. | ||
* @param idConverter the {@link IdConverter} to use. | ||
* @param idSigningService the {@link IdSigningService} to use. | ||
* @param router the {@link Router} to use. | ||
* @param accountAndContainerInjector helper to resolve account and container for a given request. | ||
* @param time the {@link Time} instance to use. | ||
* @param frontendConfig the {@link FrontendConfig} to use. | ||
* @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded. | ||
* @param clusterName the name of the storage cluster that the router communicates with | ||
* @param quotaManager {@link QuotaManager} instance to charge against quota for each chunk. | ||
* @param time the {@link Time} instance to use. | ||
* @param frontendConfig the {@link FrontendConfig} to use. | ||
* @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded. | ||
* @param clusterName the name of the storage cluster that the router communicates with | ||
* @param quotaManager {@link QuotaManager} instance to charge against quota for each chunk. | ||
* @param clusterMap The {@link ClusterMap} to use. | ||
*/ | ||
PostBlobHandler(SecurityService securityService, IdConverter idConverter, IdSigningService idSigningService, | ||
Router router, AccountAndContainerInjector accountAndContainerInjector, Time time, FrontendConfig frontendConfig, | ||
FrontendMetrics frontendMetrics, String clusterName, QuotaManager quotaManager) { | ||
FrontendMetrics frontendMetrics, String clusterName, QuotaManager quotaManager, ClusterMap clusterMap) { | ||
this.securityService = securityService; | ||
this.idConverter = idConverter; | ||
this.idSigningService = idSigningService; | ||
|
@@ -108,6 +116,8 @@ class PostBlobHandler { | |
this.frontendMetrics = frontendMetrics; | ||
this.clusterName = clusterName; | ||
this.quotaManager = quotaManager; | ||
this.routerConfig = router.getRouterConfig(); | ||
this.clusterMap = clusterMap; | ||
} | ||
|
||
/** | ||
|
@@ -194,6 +204,7 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) { | |
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo)); | ||
} else { | ||
PutBlobOptions options = getPutBlobOptionsFromRequest(); | ||
constructBlobIdAndSetIntoRestRequestInternalHeader(blobInfo, options); | ||
router.putBlob(null, blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options, | ||
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); | ||
} | ||
|
@@ -468,5 +479,21 @@ private String getAndVerifyReservedMetadataBlobId(Map<String, String> metadata, | |
} | ||
return chunkReservedMetadataBlobId; | ||
} | ||
|
||
/** | ||
* Reconstruct the blob Id by excluding the partition Id info. | ||
* @param blobInfo the {@link BlobInfo} | ||
* @param options the {@link PutBlobOptions} | ||
*/ | ||
private void constructBlobIdAndSetIntoRestRequestInternalHeader(BlobInfo blobInfo, PutBlobOptions options) { | ||
BlobProperties blobProperties = blobInfo.getBlobProperties(); | ||
boolean isSimpleBlob = !options.isChunkUpload() && !options.skipCompositeChunk(); | ||
//Chunk upload does not need the customized blobId | ||
BlobId.BlobDataType blobDataType = isSimpleBlob ? BlobId.BlobDataType.SIMPLE : BlobId.BlobDataType.METADATA; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm.. I think we won't know if it is a "simple blob" or "composite blob" until we read data from Netty. Once we cross 4 MB, it becomes composite blob https://github.com/linkedin/ambry/blob/master/ambry-router/src/main/java/com/github/ambry/router/PutOperation.java#L688. I am wondering if we need to exclude this field as well during Ambry ID -> Caspian ID mapping |
||
BlobId customizedBlobId = new BlobId(routerConfig.routerBlobidCurrentVersion, BlobId.BlobIdType.NATIVE, | ||
clusterMap.getLocalDatacenterId(), blobProperties.getAccountId(), blobProperties.getContainerId(), | ||
blobProperties.isEncrypted(), blobDataType, blobProperties.getReservedUuid()); | ||
restRequest.setArg(BLOB_ID_EXCLUDE_PARTITION, customizedBlobId.getID()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like we were discussing offline, even though our CFG2 has version as 6, I am wondering if it may not be safe to change default value to 7 until these changes are rolled out!