Skip to content

Commit

Permalink
[KYUUBI apache#6754] Improve the performance of ranger access requests
Browse files Browse the repository at this point in the history
  • Loading branch information
wankunde committed Oct 18, 2024
1 parent 2d64255 commit e198179
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
13 changes: 13 additions & 0 deletions extensions/spark/kyuubi-spark-authz/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,19 @@
<artifactId>${hudi.artifact}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>mockito-4-11_${scala.binary.version}</artifactId>
<version>${scalatestplus.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.kyuubi.plugin.spark.authz.ranger

import scala.collection.mutable.ArrayBuffer
import scala.collection.immutable.HashMap

import org.apache.ranger.plugin.policyengine.RangerAccessRequest
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.plugin.spark.authz._
import org.apache.kyuubi.plugin.spark.authz.ObjectType._
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
import org.apache.kyuubi.plugin.spark.authz.rule.Authorization
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
Expand All @@ -34,23 +35,22 @@ case class RuleAuthorization(spark: SparkSession) extends Authorization(spark) {
val auditHandler = new SparkRangerAuditHandler
val ugi = getAuthzUgi(spark.sparkContext)
val (inputs, outputs, opType) = PrivilegesBuilder.build(plan, spark)
val requests = new ArrayBuffer[AccessRequest]()
var requests = new HashMap[(AccessResource, AccessType), AccessRequest]()

def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = {
objects.foreach { obj =>
val resource = AccessResource(obj, opType)
val accessType = ranger.AccessType(obj, opType, isInput)
if (accessType != AccessType.NONE && !requests.exists(o =>
o.accessType == accessType && o.getResource == resource)) {
requests += AccessRequest(resource, ugi, opType, accessType)
if (accessType != AccessType.NONE) {
requests += (resource, accessType) -> AccessRequest(resource, ugi, opType, accessType)
}
}
}

addAccessRequest(inputs, isInput = true)
addAccessRequest(outputs, isInput = false)

val requestArrays = requests.map { request =>
val requestArrays = requests.values.map { request =>
val resource = request.getResource.asInstanceOf[AccessResource]
resource.objectType match {
case ObjectType.COLUMN if resource.getColumns.nonEmpty =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz.ranger

import java.io.File
import java.nio.file.Files

import scala.reflect.io.Path.jfile2path

import org.apache.ranger.plugin.policyengine.RangerAccessRequest
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.scalatestplus.mockito.MockitoSugar
// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.BeforeAndAfterAll

import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider

class RuleAuthorizationSuite extends AnyFunSuite
with SparkSessionProvider with BeforeAndAfterAll with MockitoSugar {
// scalastyle:on
override protected val catalogImpl: String = "hive"

private var tempDir: File = _

override def beforeAll(): Unit = {
tempDir = Files.createTempDirectory("kyuubi-test-").toFile
}


override def afterAll(): Unit = {
if (tempDir != null) {
tempDir.deleteRecursively()
}
spark.stop()
super.afterAll()
}

// scalastyle:on
test("KYUUBI #6754: improve the performance of ranger access requests") {
val outputPath = tempDir.getAbsolutePath + "/small_files"
spark.range(1, 1000, 1, 1000).write.parquet(outputPath)

val plugin = mock[SparkRangerAdminPlugin.type]
when(plugin.verify(Seq(any[RangerAccessRequest]), any[SparkRangerAuditHandler]))
.thenAnswer(_ => ())

val df = spark.read.parquet(outputPath + "/*.parquet")
val plan = df.queryExecution.optimizedPlan
val start = System.currentTimeMillis()
RuleAuthorization(spark).checkPrivileges(spark, plan)
val end = System.currentTimeMillis()
assert(end - start < 10000, "RuleAuthorization.checkPrivileges() timed out")
}
}

0 comments on commit e198179

Please sign in to comment.