Skip to content

Commit

Permalink
[feat][broker]: support missing broker level fine-granted permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Nov 25, 2024
1 parent 387a96d commit e78c8e1
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;

Expand Down Expand Up @@ -383,4 +377,13 @@ default CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(Name
String.format("getPermissionsAsync on namespaceName %s is not supported by the Authorization",
namespaceName)));
}

default CompletableFuture<Boolean> allowBrokerOperationAsync(String clusterName,
String brokerId,
BrokerOperation brokerOperation,
String role,
AuthenticationDataSource authData) {
return FutureUtil.failedFuture(
new UnsupportedOperationException("allowBrokerOperationAsync is not supported yet."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.core.Response;
Expand All @@ -37,13 +38,8 @@
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -544,6 +540,28 @@ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
}
}

public CompletableFuture<Boolean> allowBrokerOperationAsync(String clusterName,
String brokerId,
BrokerOperation brokerOperation,
String originalRole,
String role,
AuthenticationDataSource authData) {
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}

if (isProxyRole(role)) {
final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId,
brokerOperation, role, authData);
final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId,
brokerOperation, originalRole, authData);
return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
(isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
} else {
return provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData);
}
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.BrokerOperation;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ThreadDumpUtil;
Expand Down Expand Up @@ -107,7 +109,7 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") })
public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(cluster, pulsar().getBrokerId(), BrokerOperation.LIST_BROKERS)
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
.thenAccept(activeBrokers -> {
Expand Down Expand Up @@ -148,7 +150,9 @@ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse) throw
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Leader broker not found") })
public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
validateSuperUserAccessAsync().thenAccept(__ -> {
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(),
pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER)
.thenAccept(__ -> {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = BrokerInfo.builder()
Expand All @@ -175,7 +179,8 @@ public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse,
@PathParam("clusterName") String cluster,
@PathParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(),
pulsar().getBrokerId(), BrokerOperation.LIST_OWNED_NAMESPACES)
.thenCompose(__ -> maybeRedirectToBroker(brokerId))
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
Expand Down Expand Up @@ -204,7 +209,8 @@ public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse,
public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName,
@PathParam("configValue") String configValue) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.UPDATE_DYNAMIC_CONFIGURATION)
.thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue))
.thenAccept(__ -> {
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
Expand All @@ -228,7 +234,8 @@ public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
public void deleteDynamicConfiguration(
@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.DELETE_DYNAMIC_CONFIGURATION)
.thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName))
.thenAccept(__ -> {
LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), configName);
Expand All @@ -249,7 +256,8 @@ public void deleteDynamicConfiguration(
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS)
.thenCompose(__ -> dynamicConfigurationResources().getDynamicConfigurationAsync())
.thenAccept(configOpt -> asyncResponse.resume(configOpt.orElseGet(Collections::emptyMap)))
.exceptionally(ex -> {
Expand All @@ -266,7 +274,8 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS)
.thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex);
Expand All @@ -281,7 +290,8 @@ public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse)
response = String.class, responseContainer = "Map")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.LIST_RUNTIME_CONFIGURATIONS)
.thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getRuntimeConfiguration()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get runtime configuration.", clientAppId(), ex);
Expand Down Expand Up @@ -322,7 +332,8 @@ private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(
@ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.GET_INTERNAL_CONFIGURATION_DATA)
.thenAccept(__ -> asyncResponse.resume(pulsar().getInternalConfigurationData()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get internal configuration data.", clientAppId(), ex);
Expand All @@ -339,7 +350,8 @@ public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse)
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.CHECK_BACKLOG_QUOTA)
.thenAcceptAsync(__ -> {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
Expand Down Expand Up @@ -378,7 +390,8 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.HEALTH_CHECK)
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
Expand Down Expand Up @@ -596,8 +609,9 @@ public void shutDownBrokerGracefully(
@QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic,
@Suspended final AsyncResponse asyncResponse
) {
validateSuperUserAccess();
doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic)
validateBrokerOperationAsync(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.SHUTDOWN)
.thenCompose(__ -> doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic))
.thenAccept(__ -> {
LOG.info("[{}] Successfully shutdown broker gracefully", clientAppId());
asyncResponse.resume(Response.noContent().build());
Expand All @@ -614,5 +628,31 @@ private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int maxConcurren
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
return pulsar().closeAsync();
}


private CompletableFuture<Void> validateBrokerOperationAsync(String cluster, String brokerId, BrokerOperation operation) {
final var pulsar = pulsar();
if (pulsar.getBrokerService().isAuthorizationEnabled()) {
return pulsar.getBrokerService().getAuthorizationService()
.allowBrokerOperationAsync(cluster, brokerId, operation, originalPrincipal(), clientAppId(), clientAuthData())
.thenAccept(isAuthorized -> {
if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to validateTenantOperation for"
+ " originalPrincipal [%s] and clientAppId [%s] "
+ "about operation [%s] on broker [%s]",
originalPrincipal(), clientAppId(), operation.toString(), brokerId));
}
}).exceptionallyCompose(ex -> {
// Fallback to superuser validation for compatibility purposes.
final var rc = FutureUtil.unwrapCompletionException(ex);
if (rc instanceof UnsupportedOperationException) {
return validateSuperUserAccessAsync();
}
throw new CompletionException(ex);
});
}
return CompletableFuture.completedFuture(null);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.pulsar.common.policies.data;

public enum BrokerOperation {
LIST_BROKERS,
GET_BROKER,

GET_LEADER_BROKER,
LIST_OWNED_NAMESPACES,

LIST_DYNAMIC_CONFIGURATIONS,
UPDATE_DYNAMIC_CONFIGURATION,
DELETE_DYNAMIC_CONFIGURATION,

LIST_RUNTIME_CONFIGURATIONS,

GET_INTERNAL_CONFIGURATION_DATA,

CHECK_BACKLOG_QUOTA,
HEALTH_CHECK,
SHUTDOWN
}

0 comments on commit e78c8e1

Please sign in to comment.