Skip to content

Commit

Permalink
Merge pull request #345 from RedisLabs/fix-328-acl-auth-v3
Browse files Browse the repository at this point in the history
Fix #328 acl auth in spark 3 and update jedis, spark version
  • Loading branch information
fe2s authored Jun 13, 2022
2 parents 8f6c447 + db23347 commit aca7908
Show file tree
Hide file tree
Showing 23 changed files with 272 additions and 74 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
name: install Redis
command: |
sudo apt-get -y update
sudo apt-get install -y gcc-8 g++-8 libssl-dev
sudo apt-get install -y libssl-dev
wget http://download.redis.io/releases/redis-6.0.10.tar.gz
tar -xzvf redis-6.0.10.tar.gz
make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes
Expand Down Expand Up @@ -118,7 +118,7 @@ jobs:
name: install Redis
command: |
sudo apt-get -y update
sudo apt-get install -y gcc-8 g++-8 libssl-dev
sudo apt-get install -y libssl-dev
wget http://download.redis.io/releases/redis-6.0.10.tar.gz
tar -xzvf redis-6.0.10.tar.gz
make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# user
USER_ACL = user alice on >p1pp0 ~* +@all

# STANDALONE REDIS NODE
define REDIS_STANDALONE_NODE_CONF
daemonize yes
Expand All @@ -7,6 +10,7 @@ logfile /tmp/redis_standalone_node_for_spark-redis.log
save ""
appendonly no
requirepass passwd
$(USER_ACL)
endef

# STANDALONE REDIS NODE WITH SSL
Expand All @@ -18,6 +22,7 @@ logfile /tmp/redis_standalone_node_ssl_for_spark-redis.log
save ""
appendonly no
requirepass passwd
$(USER_ACL)
tls-auth-clients no
tls-port 6380
tls-cert-file ./src/test/resources/tls/redis.crt
Expand All @@ -30,6 +35,7 @@ endef
define REDIS_CLUSTER_NODE1_CONF
daemonize yes
port 7379
$(USER_ACL)
pidfile /tmp/redis_cluster_node1_for_spark-redis.pid
logfile /tmp/redis_cluster_node1_for_spark-redis.log
save ""
Expand All @@ -41,6 +47,7 @@ endef
define REDIS_CLUSTER_NODE2_CONF
daemonize yes
port 7380
$(USER_ACL)
pidfile /tmp/redis_cluster_node2_for_spark-redis.pid
logfile /tmp/redis_cluster_node2_for_spark-redis.log
save ""
Expand All @@ -52,6 +59,7 @@ endef
define REDIS_CLUSTER_NODE3_CONF
daemonize yes
port 7381
$(USER_ACL)
pidfile /tmp/redis_cluster_node3_for_spark-redis.pid
logfile /tmp/redis_cluster_node3_for_spark-redis.log
save ""
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ Spark-Redis also supports Spark Streaming (DStreams) and Structured Streaming.
The library has several branches, each corresponds to a different supported Spark version. For example, 'branch-2.3' works with any Spark 2.3.x version.
The master branch contains the recent development for the next release.

| Spark-Redis | Spark | Redis | Supported Scala Versions |
| ----------------------------------------------------------------| ------------- | ---------------- | ------------------------ |
| [master](https://github.com/RedisLabs/spark-redis/) | 3.0.x | >=2.9.0 | 2.12 |
| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 |
| Spark-Redis | Spark | Redis | Supported Scala Versions |
|---------------------------------------------------------------------------|-------| ---------------- | ------------------------ |
| [master](https://github.com/RedisLabs/spark-redis/) | 3.2.x | >=2.9.0 | 2.12 |
| [3.0](https://github.com/RedisLabs/spark-redis/tree/branch-3.0) | 3.0.x | >=2.9.0 | 2.12 |
| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 |


## Known limitations
Expand Down
1 change: 1 addition & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The supported configuration parameters are:
* `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster
topology from the initial node, so there is no need to provide the rest of the cluster nodes.
* `spark.redis.port` - the initial node's TCP redis port.
* `spark.redis.user` - the initial node's AUTH user
* `spark.redis.auth` - the initial node's AUTH password
* `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode.
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default
Expand Down
31 changes: 16 additions & 15 deletions doc/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,22 +330,23 @@ root

## DataFrame options

| Name | Description | Type | Default |
| -----------------------| ------------------------------------------------------------------------------------------| --------------------- | ------- |
| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model)| `enum [binary, hash]` | `hash` |
| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` |
| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` |
| Name | Description | Type | Default |
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------------- | ------- |
| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model) | `enum [binary, hash]` | `hash` |
| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` |
| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` |
| key.column | when writing - specifies unique column used as a Redis key, by default a key is auto-generated <br/> when reading - specifies column name to store hash key | `String` | - |
| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |
| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` |
| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` |
| user | overrides `spark.redis.user` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` |
| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` |


## Known limitations
Expand Down
11 changes: 3 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<name>Spark-Redis</name>
<description>A Spark library for Redis</description>
<url>http://github.com/RedisLabs/spark-redis</url>
Expand Down Expand Up @@ -49,8 +49,8 @@
<java.version>1.8</java.version>
<scala.major.version>2.12</scala.major.version>
<scala.complete.version>${scala.major.version}.0</scala.complete.version>
<jedis.version>3.4.1</jedis.version>
<spark.version>3.0.1</spark.version>
<jedis.version>3.9.0</jedis.version>
<spark.version>3.2.1</spark.version>
<plugins.scalatest.version>1.0</plugins.scalatest.version>
</properties>

Expand Down Expand Up @@ -271,11 +271,6 @@
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.redislabs.provider.redis

import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool}
import redis.clients.jedis.exceptions.JedisConnectionException
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConversions._


Expand All @@ -21,11 +21,11 @@ object ConnectionPool {
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
poolConfig.setMinEvictableIdleTimeMillis(60000)
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
poolConfig.setSoftMinEvictableIdleTime(Duration.ofMinutes(1))
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30))
poolConfig.setNumTestsPerEvictionRun(-1)

new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
new JedisPool(poolConfig, re.host, re.port, re.timeout, re.user, re.auth, re.dbNum, re.ssl)
}
)
var sleepTime: Int = 4
Expand Down
50 changes: 35 additions & 15 deletions src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import scala.collection.JavaConversions._
*
* @param host the redis host or ip
* @param port the redis port
* @param user the authentication username
* @param auth the authentication password
* @param dbNum database number (should be avoided in general)
* @param ssl true to enable SSL connection. Defaults to false
*/
case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
port: Int = Protocol.DEFAULT_PORT,
user: String = null,
auth: String = null,
dbNum: Int = Protocol.DEFAULT_DATABASE,
timeout: Int = Protocol.DEFAULT_TIMEOUT,
Expand All @@ -36,6 +38,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
this(
conf.get("spark.redis.host", Protocol.DEFAULT_HOST),
conf.getInt("spark.redis.port", Protocol.DEFAULT_PORT),
conf.get("spark.redis.user", null),
conf.get("spark.redis.auth", null),
conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE),
conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT),
Expand All @@ -54,6 +57,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
this(
parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)),
parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt,
parameters.getOrElse("user", conf.get("spark.redis.user", null)),
parameters.getOrElse("auth", conf.get("spark.redis.auth", null)),
parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt,
parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt,
Expand All @@ -64,17 +68,17 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
/**
* Constructor with Jedis URI
*
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
* @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
*/
def this(uri: URI) {
this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme)
}

/**
* Constructor with Jedis URI from String
*
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
* @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
*/
def this(uri: String) {
this(URI.create(uri))
Expand Down Expand Up @@ -280,8 +284,14 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt

//simply re-enter this function witht he master host/port
getNonClusterNodes(initialHost = new RedisEndpoint(host, port,
initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl))
getNonClusterNodes(initialHost = RedisEndpoint(
host = host,
port = port,
user = initialHost.user,
auth = initialHost.auth,
dbNum = initialHost.dbNum,
ssl = initialHost.ssl
))

} else {
//this is a master - take its slaves
Expand All @@ -295,10 +305,17 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {

val nodes = master +: slaves
val range = nodes.length
(0 until range).map(i =>
RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum,
initialHost.timeout, initialHost.ssl),
0, 16383, i, range)).toArray
(0 until range).map(i => {
val endpoint = RedisEndpoint(
host = nodes(i)._1,
port = nodes(i)._2,
user = initialHost.user,
auth = initialHost.auth,
dbNum = initialHost.dbNum,
timeout = initialHost.timeout,
ssl = initialHost.ssl)
RedisNode(endpoint, 0, 16383, i, range)
}).toArray
}
}

Expand Down Expand Up @@ -326,12 +343,15 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
val port = node.get(1).toString.toInt
RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
initialHost.timeout, initialHost.ssl),
sPos,
ePos,
i,
slotInfo.size - 2)
val endpoint = RedisEndpoint(
host = host,
port = port,
user = initialHost.user,
auth = initialHost.auth,
dbNum = initialHost.dbNum,
timeout = initialHost.timeout,
ssl = initialHost.ssl)
RedisNode(endpoint, sPos, ePos, i, slotInfo.size - 2)
})
}
}.toArray
Expand Down
Loading

0 comments on commit aca7908

Please sign in to comment.