Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconstruct blobId by excluding partition Id #2910

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public class RouterConfig {
* The version to use for new BlobIds.
*/
@Config(ROUTER_BLOBID_CURRENT_VERSION)
@Default("5")
@Default("7")
public final short routerBlobidCurrentVersion;

/**
Expand Down Expand Up @@ -845,8 +845,8 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getDoubleInRange(ROUTER_LATENCY_TOLERANCE_QUANTILE, DEFAULT_LATENCY_TOLERANCE_QUANTILE,
0.0, 1.0);
routerBlobidCurrentVersion =
verifiableProperties.getShortFromAllowedValues(ROUTER_BLOBID_CURRENT_VERSION, (short) 6,
new Short[]{1, 2, 3, 4, 5, 6});
verifiableProperties.getShortFromAllowedValues(ROUTER_BLOBID_CURRENT_VERSION, (short) 7,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we were discussing offline, even though our CFG2 has version as 6, I am wondering if it may not be safe to change default value to 7 until these changes are rolled out!

new Short[]{1, 2, 3, 4, 5, 6, 7});
routerMetadataContentVersion =
verifiableProperties.getShortFromAllowedValues(ROUTER_METADATA_CONTENT_VERSION, (short) 2, new Short[]{2, 3});
routerKeyManagementServiceFactory =
Expand Down
5 changes: 5 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ public static final class InternalKeys {
* content-length header
*/
public static final String CONTENT_RANGE_LENGTH = KEY_PREFIX + "content-range-length";

/**
* The re-constructed blob Id which exclude the partition Id info.
*/
public static final String BLOB_ID_EXCLUDE_PARTITION = KEY_PREFIX + "blob-id-exclude-partition";
}

/**
Expand Down
50 changes: 48 additions & 2 deletions ambry-commons/src/main/java/com/github/ambry/commons/BlobId.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class BlobId extends StoreKey {
public static final short BLOB_ID_V4 = 4;
public static final short BLOB_ID_V5 = 5;
public static final short BLOB_ID_V6 = 6;
public static final short BLOB_ID_V7 = 7;
private static final short VERSION_FIELD_LENGTH_IN_BYTES = Short.BYTES;
private static final short UUID_SIZE_FIELD_LENGTH_IN_BYTES = Integer.BYTES;
private static final short FLAG_FIELD_LENGTH_IN_BYTES = Byte.BYTES;
Expand Down Expand Up @@ -267,12 +268,13 @@ public BlobId(short version, BlobIdType type, byte datacenterId, short accountId
this.uuidStr = uuidStr;
break;
case BLOB_ID_V6:
case BLOB_ID_V7:
this.type = type;
this.datacenterId = datacenterId;
this.accountId = accountId;
this.containerId = containerId;
this.isEncrypted = isEncrypted;
this.blobDataType = Objects.requireNonNull(blobDataType, "blobDataType can't be null for id version 6");
this.blobDataType = Objects.requireNonNull(blobDataType, "blobDataType can't be null for id version 6 or 7");
this.uuid = UUID.fromString(uuidStr);
this.uuidStr = null;
break;
Expand All @@ -283,6 +285,30 @@ public BlobId(short version, BlobIdType type, byte datacenterId, short accountId
this.partitionId = partitionId;
}

/**
* Construct a blobId which excluding partitionId and use reserved UUID.
* @param version the version in which this blob should be created.
* @param type The {@link BlobIdType} of the blob to be created. Only relevant for V3 and above.
* @param datacenterId The id of the datacenter to be embedded into the blob. Only relevant for V2 and above.
* @param accountId The id of the {@link Account} to be embedded into the blob. Only relevant for V2 and above.
* @param containerId The id of the {@link Container} to be embedded into the blob. Only relevant for V2 and above.
* @param isEncrypted {@code true} if blob that this blobId represents is encrypted. {@code false} otherwise
* @param uuidStr The uuid that is to be used to construct this id.
*/
public BlobId(short version, BlobIdType type, byte datacenterId, short accountId, short containerId,
boolean isEncrypted, BlobDataType blobDataType, String uuidStr) {
this.type = type;
this.datacenterId = datacenterId;
this.accountId = accountId;
this.containerId = containerId;
this.isEncrypted = isEncrypted;
this.blobDataType = Objects.requireNonNull(blobDataType, "blobDataType can't be null for id version " + version);
this.uuid = UUID.fromString(uuidStr);
this.uuidStr = null;
this.version = version;
this.partitionId = null;
}

/**
* Re-constructs existing blobId by deserializing from data input stream. This constructor includes an optional check
* that the stream has no more available bytes after reading.
Expand All @@ -308,6 +334,7 @@ private BlobId(DataInputStream stream, ClusterMap clusterMap, boolean ensureFull
}
switch (version) {
case BLOB_ID_V6:
case BLOB_ID_V7:
uuid = UuidSerDe.deserialize(stream);
uuidStr = null;
break;
Expand Down Expand Up @@ -363,6 +390,10 @@ public short sizeInBytes() {
return (short) (VERSION_FIELD_LENGTH_IN_BYTES + FLAG_FIELD_LENGTH_IN_BYTES + DATACENTER_ID_FIELD_LENGTH_IN_BYTES
+ ACCOUNT_ID_FIELD_LENGTH_IN_BYTES + CONTAINER_ID_FIELD_LENGTH_IN_BYTES + partitionId.getBytes().length
+ UuidSerDe.SIZE_IN_BYTES);
case BLOB_ID_V7:
return (short) (VERSION_FIELD_LENGTH_IN_BYTES + FLAG_FIELD_LENGTH_IN_BYTES + DATACENTER_ID_FIELD_LENGTH_IN_BYTES
+ ACCOUNT_ID_FIELD_LENGTH_IN_BYTES + CONTAINER_ID_FIELD_LENGTH_IN_BYTES + (partitionId == null ? 0
: partitionId.getBytes().length) + UuidSerDe.SIZE_IN_BYTES);
default:
throw new IllegalArgumentException("blobId version=" + version + " not supported");
}
Expand Down Expand Up @@ -401,6 +432,12 @@ public short getContainerId() {
return containerId;
}

public boolean isEncrypted() { return isEncrypted; }

public BlobIdType getBlobIdType() {
return type;
}

/**
* @return true if accountId and containerId in key match given accountId and containerId from store.
* Always return true if BlobId version is {@link #BLOB_ID_V1}.
Expand Down Expand Up @@ -482,6 +519,7 @@ public byte[] toBytes() {
break;
case BLOB_ID_V5:
case BLOB_ID_V6:
case BLOB_ID_V7:
flag = (byte) (type.ordinal() & BLOB_ID_TYPE_MASK);
flag |= isEncrypted ? IS_ENCRYPTED_MASK : 0;
flag |= (blobDataType.ordinal() << BLOB_DATA_TYPE_SHIFT);
Expand All @@ -493,9 +531,12 @@ public byte[] toBytes() {
default:
throw new IllegalArgumentException("blobId version=" + version + " not supported");
}
idBuf.put(partitionId.getBytes());
if (partitionId != null) {
idBuf.put(partitionId.getBytes());
}
switch (version) {
case BLOB_ID_V6:
case BLOB_ID_V7:
UuidSerDe.serialize(uuid, idBuf);
break;
default:
Expand All @@ -521,6 +562,7 @@ public byte[] getUuidBytesArray() {
uuidBuf.put(uuidBytes);
break;
case BLOB_ID_V6:
case BLOB_ID_V7:
uuidBuf = ByteBuffer.allocate(UuidSerDe.SIZE_IN_BYTES);
UuidSerDe.serialize(uuid, uuidBuf);
break;
Expand Down Expand Up @@ -548,6 +590,7 @@ public String getLongForm() {
case BLOB_ID_V4:
case BLOB_ID_V5:
case BLOB_ID_V6:
case BLOB_ID_V7:
sb.append(":").append(type);
sb.append(":").append(datacenterId);
sb.append(":").append(accountId);
Expand Down Expand Up @@ -617,6 +660,7 @@ public int compareTo(StoreKey o) {
result = uuidStr.compareTo(other.uuidStr);
break;
case BLOB_ID_V6:
case BLOB_ID_V7:
result = uuid.compareTo(other.uuid);
break;
default:
Expand All @@ -643,6 +687,7 @@ private short getVersionComparisonGroup() {
case BLOB_ID_V5:
return 3;
case BLOB_ID_V6:
case BLOB_ID_V7:
return 4;
default:
throw new IllegalArgumentException("Unrecognized blobId version " + version);
Expand Down Expand Up @@ -873,6 +918,7 @@ private static class BlobIdPreamble {
break;
case BLOB_ID_V5:
case BLOB_ID_V6:
case BLOB_ID_V7:
blobIdFlag = stream.readByte();
type = BlobIdType.values()[blobIdFlag & BLOB_ID_TYPE_MASK];
isEncrypted = (blobIdFlag & IS_ENCRYPTED_MASK) != 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void start() throws InstantiationException {
frontendMetrics, clusterMap, quotaManager, accountService);
postBlobHandler =
new PostBlobHandler(securityService, idConverter, idSigningService, router, accountAndContainerInjector,
SystemTime.getInstance(), frontendConfig, frontendMetrics, clusterName, quotaManager);
SystemTime.getInstance(), frontendConfig, frontendMetrics, clusterName, quotaManager, clusterMap);

ttlUpdateHandler =
new TtlUpdateHandler(router, securityService, idConverter, accountAndContainerInjector, frontendMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package com.github.ambry.frontend;

import com.github.ambry.account.Container;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.commons.BlobId;
import com.github.ambry.commons.Callback;
import com.github.ambry.commons.RetainingAsyncWritableChannel;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.notification.NotificationBlobType;
import com.github.ambry.quota.QuotaManager;
import com.github.ambry.quota.QuotaUtils;
import com.github.ambry.rest.RequestPath;
Expand Down Expand Up @@ -49,6 +52,7 @@
import static com.github.ambry.frontend.FrontendUtils.*;
import static com.github.ambry.rest.RestUtils.*;
import static com.github.ambry.rest.RestUtils.Headers.*;
import static com.github.ambry.rest.RestUtils.InternalKeys.*;


/**
Expand Down Expand Up @@ -81,23 +85,27 @@ class PostBlobHandler {
private final FrontendMetrics frontendMetrics;
private final String clusterName;
private final QuotaManager quotaManager;
private final RouterConfig routerConfig;
private final ClusterMap clusterMap;

/**
* Constructs a handler for handling requests for uploading or stitching blobs.
* @param securityService the {@link SecurityService} to use.
* @param idConverter the {@link IdConverter} to use.
* @param idSigningService the {@link IdSigningService} to use.
* @param router the {@link Router} to use.
*
* @param securityService the {@link SecurityService} to use.
* @param idConverter the {@link IdConverter} to use.
* @param idSigningService the {@link IdSigningService} to use.
* @param router the {@link Router} to use.
* @param accountAndContainerInjector helper to resolve account and container for a given request.
* @param time the {@link Time} instance to use.
* @param frontendConfig the {@link FrontendConfig} to use.
* @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded.
* @param clusterName the name of the storage cluster that the router communicates with
* @param quotaManager {@link QuotaManager} instance to charge against quota for each chunk.
* @param time the {@link Time} instance to use.
* @param frontendConfig the {@link FrontendConfig} to use.
* @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded.
* @param clusterName the name of the storage cluster that the router communicates with
* @param quotaManager {@link QuotaManager} instance to charge against quota for each chunk.
* @param clusterMap The {@link ClusterMap} to use.
*/
PostBlobHandler(SecurityService securityService, IdConverter idConverter, IdSigningService idSigningService,
Router router, AccountAndContainerInjector accountAndContainerInjector, Time time, FrontendConfig frontendConfig,
FrontendMetrics frontendMetrics, String clusterName, QuotaManager quotaManager) {
FrontendMetrics frontendMetrics, String clusterName, QuotaManager quotaManager, ClusterMap clusterMap) {
this.securityService = securityService;
this.idConverter = idConverter;
this.idSigningService = idSigningService;
Expand All @@ -108,6 +116,8 @@ class PostBlobHandler {
this.frontendMetrics = frontendMetrics;
this.clusterName = clusterName;
this.quotaManager = quotaManager;
this.routerConfig = router.getRouterConfig();
this.clusterMap = clusterMap;
}

/**
Expand Down Expand Up @@ -194,6 +204,7 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo));
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
constructBlobIdAndSetIntoRestRequestInternalHeader(blobInfo, options);
router.putBlob(null, blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
Expand Down Expand Up @@ -468,5 +479,21 @@ private String getAndVerifyReservedMetadataBlobId(Map<String, String> metadata,
}
return chunkReservedMetadataBlobId;
}

/**
* Reconstruct the blob Id by excluding the partition Id info.
* @param blobInfo the {@link BlobInfo}
* @param options the {@link PutBlobOptions}
*/
private void constructBlobIdAndSetIntoRestRequestInternalHeader(BlobInfo blobInfo, PutBlobOptions options) {
BlobProperties blobProperties = blobInfo.getBlobProperties();
boolean isSimpleBlob = !options.isChunkUpload() && !options.skipCompositeChunk();
//Chunk upload does not need the customized blobId
BlobId.BlobDataType blobDataType = isSimpleBlob ? BlobId.BlobDataType.SIMPLE : BlobId.BlobDataType.METADATA;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I think we won't know if it is a "simple blob" or "composite blob" until we read data from Netty. Once we cross 4 MB, it becomes composite blob https://github.com/linkedin/ambry/blob/master/ambry-router/src/main/java/com/github/ambry/router/PutOperation.java#L688. I am wondering if we need to exclude this field as well during Ambry ID -> Caspian ID mapping

BlobId customizedBlobId = new BlobId(routerConfig.routerBlobidCurrentVersion, BlobId.BlobIdType.NATIVE,
clusterMap.getLocalDatacenterId(), blobProperties.getAccountId(), blobProperties.getContainerId(),
blobProperties.isEncrypted(), blobDataType, blobProperties.getReservedUuid());
restRequest.setArg(BLOB_ID_EXCLUDE_PARTITION, customizedBlobId.getID());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import com.github.ambry.router.Router;
import com.github.ambry.router.RouterErrorCode;
import com.github.ambry.router.RouterException;
import com.github.ambry.router.RouterUtils;
import com.github.ambry.server.StatsReportType;
import com.github.ambry.server.StorageStatsUtilTest;
import com.github.ambry.server.storagestats.AggregatedAccountStorageStats;
Expand Down Expand Up @@ -191,6 +192,7 @@ public class FrontendRestRequestServiceTest {
private AccountAndContainerInjector accountAndContainerInjector;
private final String SECURE_PATH_PREFIX = "secure-path";
private final int CONTENT_LENGTH = 1024;
private final RouterConfig routerConfig;

/**
* Sets up the {@link FrontendRestRequestService} instance before a test.
Expand All @@ -213,10 +215,11 @@ public FrontendRestRequestServiceTest() throws Exception {
frontendMetrics = new FrontendMetrics(metricRegistry, frontendConfig);
accountAndContainerInjector = new AccountAndContainerInjector(accountService, frontendMetrics, frontendConfig);
String endpoint = "http://localhost:1174";
routerConfig = new RouterConfig(verifiableProperties);
urlSigningService = new AmbryUrlSigningService(endpoint, endpoint, frontendConfig.urlSignerDefaultUrlTtlSecs,
frontendConfig.urlSignerDefaultMaxUploadSizeBytes, frontendConfig.urlSignerMaxUrlTtlSecs,
frontendConfig.chunkUploadInitialChunkTtlSecs, 4 * 1024 * 1024, SystemTime.getInstance(), clusterMap,
new ClusterMapConfig(verifiableProperties), new RouterConfig(verifiableProperties));
new ClusterMapConfig(verifiableProperties), routerConfig);
idSigningService = new AmbryIdSigningService();
namedBlobDb = mock(NamedBlobDb.class);
idConverterFactory =
Expand Down Expand Up @@ -4396,6 +4399,8 @@ enum OpType {
String ttlUpdateServiceId = null;
String undeleteServiceId = null;
IdConverter idConverter;
Properties configProps = new Properties();


public FrontendTestRouter(IdConverterFactory idConverterFactory) {
if (idConverterFactory != null) {
Expand Down Expand Up @@ -4466,7 +4471,10 @@ public Future<Void> undeleteBlob(String blobId, String serviceId, Callback<Void>

@Override
public RouterConfig getRouterConfig() {
return null;
configProps.setProperty("router.hostname", "localhost");
configProps.setProperty("router.datacenter.name", "dc1");
VerifiableProperties properties = new VerifiableProperties(configProps);
return new RouterConfig(properties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.github.ambry.commons.CommonTestUtils;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.named.NamedBlobDb;
import com.github.ambry.named.NamedBlobDbFactory;
Expand Down
Loading
Loading