-
Notifications
You must be signed in to change notification settings - Fork 118
Implements a server for refreshing HDFS tokens, a part of secure HDFS support. #453
base: branch-2.2-kubernetes
Are you sure you want to change the base?
Implements a server for refreshing HDFS tokens, a part of secure HDFS support. #453
Conversation
Perhaps @ifilonenko @foxish would be most interested. There is one sore point in the token refresh server prototype. I used the akka library for message-driven actor thread model, which I think simplified the code a lot. (akka is the standard actor library for Scala) But this is not a good thing to do for Spark as app framework, which moved away from akka for dependency management reason. See details at http://apache-spark-developers-list.1001551.n3.nabble.com/Why-did-spark-switch-from-AKKA-to-net-td21522.html.
But it's a fine thing to do for the token refresh server, which is an independent service app and has little to do with Spark. So I think it's best for the token refresh server to live outside the Spark repo. Or, if it's too much work, keep it as an independent project with its own pom.xml with no dependencies to/from Spark. Thoughts? |
private object TokenRefreshService { | ||
|
||
def apply(system: ActorSystem) : ActorRef = { | ||
UserGroupInformation.loginUserFromKeytab( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder to myself. We'll have to refresh the kerberos TGT as well periodically.
Components 2 and 3 seem to serve similar purposes. Why do we need a periodic task to scan secrets given that we can use the watch API to receive secret-related events? |
Glad you asked. Thanks for the question, @liyinan926. The scanner is needed for two cases:
I'll update the PR description as well. |
@kimoonkim Thanks for the clarification. Makes sense to me. |
@ifilonenko and I talked last Thu in a meeting. Two improvements were suggested:
|
The refresh server code now exists as an independent project with its own pom.xml. This is to avoid including akka into the spark distribution. |
Now the refresh server has a Dockerfile and a k8s There is a Kerberos login issue. Debugging it now. |
The logic issue was caused because the Dockerfile did not put the hadoop conf dir in the classpath. Fixed in the latest commit. And confirmed the refresh server works as expected. I'll now address other issues. |
Thanks for the review, @liyinan926. Addressed the comments. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of minor comments. Generally LGTM. Thanks!
@@ -69,7 +69,12 @@ private class TokenRefreshService(kubernetesClient: KubernetesClient) extends Ac | |||
scheduler.scheduleOnce(Duration(0L, TimeUnit.MILLISECONDS), task) | |||
} | |||
|
|||
private def addStarterTask(secret: Secret) = { | |||
private def startRefresh(secret: Secret) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startRefreshTask
.
|
||
private def maybeObtainNewToken(token: Token[_ <: TokenIdentifier], expireTime: Long, | ||
nowMills: Long) = { | ||
val maybeNewToken = if (token.getKind.equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is overall nearing completion. I have included my comments below. However, IMO considerable unit and integration testing seems to be necessary.
|
||
Below is a list of the submodules for this cluster manager and what they do. | ||
|
||
* `core`: Implementation of the Kubernetes cluster manager support. | ||
* `token-refresh-server`: Extra Kubernetes service that refreshes Hadoop | ||
tokens for long-running Spark jobs accessing secure data sources like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accessing/that access
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
mkdir -p /opt/token-refresh-server/jars && \ | ||
mkdir -p /opt/token-refresh-server/work-dir | ||
|
||
ADD target/token-refresh-server-kubernetes_2.11-2.2.0-k8s-0.3.0-SNAPSHOT-assembly.tar.gz \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deprecated version, update to newest k8s assembly jar
action match { | ||
case Action.ADDED => | ||
logInfo(s"Found ${secret.getMetadata.getSelfLink} added") | ||
renewService ! StartRefresh(secret) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Filtering shouldn't be done on receive for it will block.
FilterWatchListDeletable[Secret, SecretList, lang.Boolean, Watch, Watcher[Secret]] = { | ||
kubernetesClient | ||
.secrets() | ||
.inAnyNamespace() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it was specified before, inAnyNamespace()
should be customized per use-case. I can send a PR upwards for this if you don't have the cycles for adding that feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. PTAL if the fix is what you expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest commit addresses this, by switching between inAnyNamespace()
and inNamespace()
depending on a config key.
I have a question, though. When it uses inNamespace()
, do we want the namespace value to be the current namespace that the refresh server is running in? If that's what most people want, then we should support it, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess. Might be easier to just pass in the name of the namespace. I guess three options:
anyNamespace(), inNamespace(), withNamespace(config.namespace)
|
||
package object constants { | ||
|
||
val REFRESH_SERVER_KERBEROS_PRINCIPAL = "kimoonkim" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting principal here ;) is this for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, only for testing. We should make this configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
val hadoopTokenPattern = Pattern.compile(SECRET_DATA_ITEM_KEY_REGEX_HADOOP_TOKENS) | ||
|
||
def apply(system: ActorSystem, kubernetesClient: KubernetesClient) : ActorRef = { | ||
UserGroupInformation.loginUserFromKeytab( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loginUsersFromKeytab
sets the login user of the application as a static singleton field. So it gets used later as the proxy user when a new job user token is obtained.
nowMillis: Long) : Unit = { | ||
val durationUntilExpire = tokenToExpire.values.min - nowMillis | ||
val key = s"$SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS$nowMillis-$durationUntilExpire" | ||
val credentials = new Credentials() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be blank? or should this take in the users HadoopConf from his hdfs-site.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think Credentials
constructor take in a hadoop config object.
realUser.toString | ||
} | ||
val credentials = new Credentials() | ||
val ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why create a proxy user instead of just using UGI.getLoginUser
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refresh server is designed to serve multiple Spark jobs. These jobs may come from many different users. The refresh server has its own headless Kerberos account as the login user, which is UGI.getLoginUser
. On the other hand, the HDFS tokens it creates belong to the job user accounts because the job's driver and executors should use them to access the job user data. The proxy user allows the refresh server to use its own account to obtain job user tokens. If it uses the login user, the obtained tokens would not be able to access the job user data.
kubectl create secret generic hadoop-token-refresh-server-kerberos-keytab \ | ||
--from-file /mnt/secrets/krb5.keytab | ||
|
||
5. Create a k8s `service account` and `clusterrolebinding` that the service pod will use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note as optional
Thanks for the review @ifilonenko. Yes, I plan to add unit tests before this gets merged in. But I hope we can delay integration tests until the other PR is merged in so that we can verify end-to-end in the integration test. |
I have been adding unit tests. I am getting there, but still need to add tests for renew tasks. |
Reminder to myself. I found an edge case that the current code does not handle. If a refresh server was down for a while, like a few days, and brought back up, the namenode may have removed some expired tokens in the mean time. It will return the following error when the refresh server tries to renew the token in the orphaned secret. We should think about how to fix this:
org.apache.hadoop.security.token.SecretManager$InvalidToken: Renewal request for unknown token
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Renewal request for unknown token
|
I added unit tests for all major components and their common case code paths. (Error case code paths are not covered yet) PTAL. |
Will be doing another pass for unit tests, thank you for this! |
Yes, please take a look at unit tests. Also, please note that unit tests are not run yet by CI. I'll see what I can do about that. |
Running unit tests from the top like below fails. Seems like dependency issues. I need to investigate:
|
@ifilonenko @liyinan926
Implements a standalone server, named hadoop token refresh server, that extends the lifetimes of HDFS tokens stored in K8s Secrets. This is part of the secure HDFS support.
HDFS tokens expire after every 24 hours or some configured time period. You can renew them until max time 7 days. After that, you need to obtain brand new tokens.
This refresh server takes care of both short term token renewal and obtaining new tokens.
The server runs in a k8s pod. There is a
Dockerfile
for building docker images and a k8sdeployment
yaml file for launching the pod.This code exists in a separate maven project with its own pom.xml file.
You can build a tarball using:
For details, see README.md
The refresh server has three parallel components:
Token Refresh Service
: An actor-based service that takes discovered secrets as input and schedules tasks to renew tokens in the secrets. Other components below send commands to this service.Secret Scanner
: A periodic thread that scans and discovers secrets that have a well-known label key value pair.refresh-hadoop-tokens: "yes"
Secret Watcher
: An event watcher that discovers new or deleted secrets that have the said label.Although its role overlaps with the secret watcher, the secret scanner is needed for bootstrapping and recovery:
Each renew task is in charge of one or more tokens stored in a K8s secret. When it runs, the task does:
Tested manually so far. (I'll add unit tests in this PR. I plan to add integration tests later when both the other secure HDFS PR and this PR are merged in)
Here's the log that shows how a brand new token is obtained:
After the above run, the K8s secret now has two data items, with the latest item containing the new token: