Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throughput control optimizations #1

Open
wants to merge 2 commits into
base: ThroughputControl-Global
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,17 @@ private Mono<IThroughputContainerController> createAndInitContainerController(St
checkArgument(StringUtils.isNotEmpty(containerLink), "Container link should not be null or empty");

if (this.groupMapByContainer.containsKey(containerLink)) {
return Mono.just(this.groupMapByContainer.get(containerLink))
.flatMap(groups -> {
ThroughputContainerController containerController =
new ThroughputContainerController(
this.connectionMode,
this.globalEndpointManager,
groups,
this.partitionKeyRangeCache);

return containerController.init();
});
Set<ThroughputControlGroupInternal> groups =
this.groupMapByContainer.get(containerLink);
ThroughputContainerController containerController =
new ThroughputContainerController(
this.connectionMode,
this.globalEndpointManager,
groups,
this.partitionKeyRangeCache);
return containerController.init();
} else {
return Mono.just(new EmptyThroughputContainerController())
.flatMap(EmptyThroughputContainerController::init);
return Mono.just(new EmptyThroughputContainerController());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ private ThroughputResolveLevel getThroughputResolveLevel(Set<ThroughputControlGr
@SuppressWarnings("unchecked")
public <T> Mono<T> init() {
return this.resolveContainerMaxThroughput()
.flatMap(controller -> this.createAndInitializeGroupControllers())
.doOnSuccess(controller -> {
Schedulers.parallel().schedule(() -> this.refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).subscribe());
})
.thenReturn((T) this);
.then(this.createAndInitializeGroupControllers())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but for then, "this.createAndInitializeGroupControllers" will still be executed right? The intention here is to only execute when the previous step succeeded?

.then(Mono.fromRunnable(() -> {
this.refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
}))
.thenReturn((T) this);
}

private Mono<String> resolveDatabaseResourceId() {
Expand Down Expand Up @@ -145,43 +145,73 @@ private Mono<ThroughputResponse> resolveContainerThroughput() {
}

private Mono<ThroughputContainerController> resolveContainerMaxThroughput() {
return Mono.just(this.throughputResolveLevel) // TODO: ---> test whether it works without defer
.flatMap(throughputResolveLevel -> {
if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) {
return this.resolveContainerThroughput()
.onErrorResume(throwable -> {
if (this.isOfferNotConfiguredException(throwable)) {
this.throughputResolveLevel = ThroughputResolveLevel.DATABASE;
}

return Mono.error(throwable);
});
} else if (throughputResolveLevel == ThroughputResolveLevel.DATABASE) {
return this.resolveDatabaseThroughput()
.onErrorResume(throwable -> {
if (this.isOfferNotConfiguredException(throwable)) {
this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER;
}

return Mono.error(throwable);
});
}

// All the underlying throughput control groups are using target throughput,
// which is constant value, hence no need to resolve throughput
return Mono.empty();
})
.flatMap(throughputResponse -> {
this.updateMaxContainerThroughput(throughputResponse);
return Mono.empty();
})
.retryWhen(
// Throughput can be configured on database level or container level
// Retry at most 1 time so we can try on database and container both
RetrySpec.max(1).filter(throwable -> this.isOfferNotConfiguredException(throwable))
).thenReturn(this);
if (ThroughputResolveLevel.NONE.equals(throughputResolveLevel)) {
return Mono.empty();
}
final Mono<ThroughputResponse> throughputResponseMono;
if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) {
throughputResponseMono = this.resolveContainerThroughput()
.onErrorResume(throwable -> {
if (this.isOfferNotConfiguredException(throwable)) {
this.throughputResolveLevel = ThroughputResolveLevel.DATABASE;
return this.resolveDatabaseThroughput();
}
return Mono.error(throwable);
});
} else {
throughputResponseMono = this.resolveDatabaseThroughput()
.onErrorResume(throwable -> {
if (this.isOfferNotConfiguredException(throwable)) {
this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER;
return this.resolveContainerThroughput();
}
return Mono.error(throwable);
});
}
return throughputResponseMono.flatMap(throughputResponse -> {
this.updateMaxContainerThroughput(throughputResponse);
return Mono.just(this);
});
}

// private Mono<ThroughputContainerController> resolveContainerMaxThroughput() {
// return Mono.just(this.throughputResolveLevel) // TODO: ---> test whether it works without defer
// .flatMap(throughputResolveLevel -> {
// if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) {
// return this.resolveContainerThroughput()
// .onErrorResume(throwable -> {
// if (this.isOfferNotConfiguredException(throwable)) {
// this.throughputResolveLevel = ThroughputResolveLevel.DATABASE;
// }
//
// return Mono.error(throwable);
// });
// } else if (throughputResolveLevel == ThroughputResolveLevel.DATABASE) {
// return this.resolveDatabaseThroughput()
// .onErrorResume(throwable -> {
// if (this.isOfferNotConfiguredException(throwable)) {
// this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER;
// }
//
// return Mono.error(throwable);
// });
// }
//
// // All the underlying throughput control groups are using target throughput,
// // which is constant value, hence no need to resolve throughput
// return Mono.empty();
// })
// .flatMap(throughputResponse -> {
// this.updateMaxContainerThroughput(throughputResponse);
// return Mono.empty();
// })
// .retryWhen(
// // Throughput can be configured on database level or container level
// // Retry at most 1 time so we can try on database and container both
// RetrySpec.max(1).filter(throwable -> this.isOfferNotConfiguredException(throwable))
// ).thenReturn(this);
// }

private Mono<ThroughputResponse> resolveThroughputByResourceId(String resourceId) {
// Note: for serverless account, when we trying to query offers,
// we will get 400/0 with error message: Reading or replacing offers is not supported for serverless accounts.
Expand Down Expand Up @@ -298,24 +328,24 @@ private Mono<ThroughputGroupControllerBase> createAndInitializeGroupController(T

}

private Flux<Void> refreshContainerMaxThroughputTask(CancellationToken cancellationToken) {
private Mono<Void> refreshContainerMaxThroughputTask(CancellationToken cancellationToken) {
checkNotNull(cancellationToken, "Cancellation token can not be null");

if (this.throughputResolveLevel == ThroughputResolveLevel.NONE) {
return Flux.empty();
return Mono.empty();
}

return Mono.delay(DEFAULT_THROUGHPUT_REFRESH_INTERVAL)
.flatMap(t -> this.resolveContainerMaxThroughput())
.then(this.resolveContainerMaxThroughput())
.flatMapIterable(controller -> this.groups)
.flatMap(group -> this.resolveThroughputGroupController(group))
.doOnNext(groupController -> groupController.onContainerMaxThroughputRefresh(this.maxContainerThroughput.get()))
.onErrorResume(throwable -> {
logger.warn("Refresh throughput failed with reason %s", throwable);
return Mono.empty();
})
.then()
.repeat(() -> !cancellationToken.isCancellationRequested());
.repeat(() -> !cancellationToken.isCancellationRequested())
.then();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputGlobalControlGroup;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import org.slf4j.Logger;
Expand Down Expand Up @@ -106,9 +107,8 @@ public Mono<ThroughputGlobalControlConfigItem> getOrCreateConfigItem() {

if (!expectedConfigItem.equals(configItem)) {
logger.warn(
"Group config using by this client is different than the one in control container, will be ignored. Using following config: {}" +
"targetThroughput: {}, targetThroughputThreshold: {}",
this.configItem.toString());
"Group config using by this client is different than the one in control container, will be ignored. "
+ "Using following config: {}", this.configItem);
}

return Mono.just(this.configItem);
Expand Down Expand Up @@ -166,8 +166,8 @@ public Mono<ThroughputGlobalControlClientItem> replaceOrCreateGroupClientItem(do
*/
public Mono<ThroughputControlContainerManager> validateControlContainer() {
return this.globalControlContainer.read()
.map(containerResponse -> containerResponse.getProperties())
.flatMap(containerProperties -> {
.flatMap(containerResponse -> {
CosmosContainerProperties containerProperties = containerResponse.getProperties();
boolean isPartitioned =
containerProperties.getPartitionKeyDefinition() != null &&
containerProperties.getPartitionKeyDefinition().getPaths() != null &&
Expand All @@ -177,9 +177,7 @@ public Mono<ThroughputControlContainerManager> validateControlContainer() {
|| !containerProperties.getPartitionKeyDefinition().getPaths().get(0).equals(PARTITION_KEY_PATH))) {
return Mono.error(new IllegalArgumentException("The control container must have partition key equal to " + PARTITION_KEY_PATH));
}

return Mono.empty();
})
.thenReturn(this);
return Mono.just(this);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ public ThroughputGroupGlobalController(
@Override
@SuppressWarnings("unchecked")
public <T> Mono<T> init() {
return this.containerManager.validateControlContainer()
.flatMap(dummy -> this.containerManager.getOrCreateConfigItem())
.flatMap(dummy -> {
double loadFactor = this.calculateLoadFactor();
return this.containerManager.createGroupClientItem(loadFactor)
.flatMap(clientItem -> this.calculateClientThroughputShare(loadFactor));
})
.flatMap(dummy -> this.resolveRequestController())
.doOnSuccess(dummy -> {
return this.containerManager
.validateControlContainer()
.then(this.containerManager.getOrCreateConfigItem())
.then(Mono.just(this.calculateLoadFactor()))
.flatMap(loadFactor -> this.containerManager
.createGroupClientItem(loadFactor)
.flatMap(clientItem -> this.calculateClientThroughputShare(loadFactor)))
.then(this.resolveRequestController())
.then(Mono.fromRunnable(() -> {
this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
this.calculateClientThroughputShareTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
})
.thenReturn((T)this);
}))
.thenReturn((T) this);
}

@Override
Expand All @@ -82,8 +82,10 @@ public void recordThroughputUsage(double throughputUsage) {

private Mono<ThroughputGroupGlobalController> calculateClientThroughputShare(double loadFactor) {
return this.containerManager.queryLoadFactorFromAllClients()
.doOnSuccess(totalLoads -> this.clientThroughputShare.set(loadFactor / totalLoads))
.thenReturn(this);
.flatMap(totalLoads -> {
this.clientThroughputShare.set(loadFactor / totalLoads);
return Mono.just(this);
});
}

private double calculateLoadFactor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public ThroughputGroupLocalController(
@SuppressWarnings("unchecked")
public <T> Mono<T> init() {
return this.resolveRequestController()
.doOnSuccess(dummy -> {
this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
})
.thenReturn((T)this);
.then(Mono.fromRunnable(() -> {
this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
}))
.thenReturn((T) this);
}

@Override
Expand All @@ -42,6 +42,6 @@ public double getClientThroughputShare() {

@Override
public void recordThroughputUsage(double loadFactor) {
return;
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ public <T> Mono<T> init() {
return Flux.fromIterable(this.globalEndpointManager.getReadEndpoints())
.flatMap(endpoint -> {
requestThrottlerMapByRegion.computeIfAbsent(endpoint, key -> new ThroughputRequestThrottler(this.scheduledThroughput.get()));
return Mono.empty();
})
.then(Mono.just((T)this));
return Mono.just((T)this);
}).single();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ public Mono<Void> close() {
@SuppressWarnings("unchecked")
public <T> Mono<T> init() {
return this.getPartitionKeyRanges(RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES)
.doOnSuccess(pkRanges -> {
.flatMap(pkRanges -> {
this.pkRanges = pkRanges;
this.createRequestThrottlers();
})
.then(Mono.just((T)this));
return Mono.just((T)this);
});
}

private void createRequestThrottlers() {
Expand Down