Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Implements a server for refreshing HDFS tokens, a part of secure HDFS support. #453

Open
wants to merge 43 commits into
base: branch-2.2-kubernetes
Choose a base branch
from

Conversation

kimoonkim
Copy link
Member

@kimoonkim kimoonkim commented Aug 22, 2017

@ifilonenko @liyinan926

Implements a standalone server, named hadoop token refresh server, that extends the lifetimes of HDFS tokens stored in K8s Secrets. This is part of the secure HDFS support.

HDFS tokens expire after every 24 hours or some configured time period. You can renew them until max time 7 days. After that, you need to obtain brand new tokens.

This refresh server takes care of both short term token renewal and obtaining new tokens.

The server runs in a k8s pod. There is a Dockerfile for building docker images and a k8s deployment yaml file for launching the pod.

This code exists in a separate maven project with its own pom.xml file.
You can build a tarball using:

$ mvn clean package

For details, see README.md

The refresh server has three parallel components:

  1. Token Refresh Service: An actor-based service that takes discovered secrets as input and schedules tasks to renew tokens in the secrets. Other components below send commands to this service.
  2. Secret Scanner: A periodic thread that scans and discovers secrets that have a well-known label key value pair. refresh-hadoop-tokens: "yes"
  3. Secret Watcher: An event watcher that discovers new or deleted secrets that have the said label.

Although its role overlaps with the secret watcher, the secret scanner is needed for bootstrapping and recovery:

  • When the refresh server started for the first time or restarted after crashing. The watcher would not inform the refresh server of the list of secrets to support if those secrets were added before the refresh server started.
  • When the watcher connection to the K8s API server was not stable intermittently and the refresh server was missing some events because of that. The periodic scanning would eventually discover new secrets added/deleted during the bad period.

Each renew task is in charge of one or more tokens stored in a K8s secret. When it runs, the task does:

  1. Renew tokens 24-hours if they are about to expire soon.
  2. Optionally, check if some tokens are about to reach their max time 7 days. If yes, obtain replacement tokens for them. And write the snapshot of all tokens back to the associated K8s secret.
  3. Schedule a new task near the next expire time by sending a command to the token refresh service.

Tested manually so far. (I'll add unit tests in this PR. I plan to add integration tests later when both the other secure HDFS PR and this PR are merged in)

Here's the log that shows how a brand new token is obtained:

17/10/05 13:48:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/10/05 13:48:09 INFO UserGroupInformation: Login successful for user kimoonkim using keytab file /mnt/secrets/krb5.keytab
17/10/05 13:48:19 INFO SecretScanner: Scanned /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt
17/10/05 13:48:19 INFO TokenRefreshService: Started refresh of tokens in /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt with akka.actor.LightArrayRevolverScheduler$$anon$3@50bec29b
17/10/05 13:48:19 INFO StarterTask: Matching secret data hadoop-tokens-1504043833345-86400127, result true
17/10/05 13:48:19 INFO StarterTask: Read Hadoop tokens: Map(Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.6.157:8020, Ident: (HDFS_DELEGATION_TOKEN token 9 for kimoonkim) -> 1504130233472)
17/10/05 13:48:19 INFO TokenRefreshService: Scheduling refresh of tokens with /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt at now + 0 millis.
17/10/05 13:48:19 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 15 for kimoonkim on 192.168.6.157:8020
17/10/05 13:48:19 INFO RenewTask: Obtained token Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.6.157:8020, Ident: (HDFS_DELEGATION_TOKEN token 15 for kimoonkim)
17/10/05 13:48:20 INFO RenewTask: Wrote a new token to a2ba24a7-8c14-11e7-8c63-02f2c310e88c
17/10/05 13:48:20 INFO RenewTask: Renewed tokens Map(Kind: HDFS_DELEGATION_TOKEN, Service: 192.168.6.157:8020, Ident: (HDFS_DELEGATION_TOKEN token 15 for kimoonkim) -> 1507322899852). Next expire time 1507322899852
17/10/05 13:48:20 INFO TokenRefreshService: Scheduling refresh of tokens with /api/v1/namespaces/default/secrets/spark.kubernetes.kerberos.dt at now + 77759847 millis.

After the above run, the K8s secret now has two data items, with the latest item containing the new token:

$ kubectl get secret -o json spark.kubernetes.kerberos.dt
{
    "apiVersion": "v1",
    "data": {
        "hadoop-tokens-1504043833345-86400127": "SERUUwABEjE5Mi4xNjguNi4xNTc6ODAyMDUAGGtpbW9vbmtpbUBQRVBQRVJEQVRBLkNPTQlraW1vb25raW0AigFeL/+XWYoBXlQMG1kJChQhrFzhamtHiO0PFswswtbjpaadaBVIREZTX0RFTEVHQVRJT05fVE9LRU4SMTkyLjE2OC42LjE1Nzo4MDIwAA==",
        "hadoop-tokens-1507236499185-86400667": "SERUUwABEjE5Mi4xNjguNi4xNTc6ODAyMDUAGGtpbW9vbmtpbUBQRVBQRVJEQVRBLkNPTQlraW1vb25raW0AigFe7kvRFYoBXxJYVRUPNRT9sWFg3sxt8q4A/ELbneeWSj2BahVIREZTX0RFTEVHQVRJT05fVE9LRU4SMTkyLjE2OC42LjE1Nzo4MDIwAA=="
    },
...

@kimoonkim
Copy link
Member Author

Perhaps @ifilonenko @foxish would be most interested. There is one sore point in the token refresh server prototype. I used the akka library for message-driven actor thread model, which I think simplified the code a lot. (akka is the standard actor library for Scala)

But this is not a good thing to do for Spark as app framework, which moved away from akka for dependency management reason. See details at http://apache-spark-developers-list.1001551.n3.nabble.com/Why-did-spark-switch-from-AKKA-to-net-td21522.html.

More specifically, many user applications that link to Spark also linked to Akka as a library (e.g. say you want to write a service that receives requests from Akka and runs them on Spark). In that case, you'd have two conflicting versions of the Akka library in the same JVM.

But it's a fine thing to do for the token refresh server, which is an independent service app and has little to do with Spark. So I think it's best for the token refresh server to live outside the Spark repo. Or, if it's too much work, keep it as an independent project with its own pom.xml with no dependencies to/from Spark.

Thoughts?

private object TokenRefreshService {

def apply(system: ActorSystem) : ActorRef = {
UserGroupInformation.loginUserFromKeytab(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to myself. We'll have to refresh the kerberos TGT as well periodically.

@kimoonkim kimoonkim mentioned this pull request Aug 23, 2017
11 tasks
@liyinan926
Copy link
Member

Components 2 and 3 seem to serve similar purposes. Why do we need a periodic task to scan secrets given that we can use the watch API to receive secret-related events?

@kimoonkim
Copy link
Member Author

Glad you asked. Thanks for the question, @liyinan926. The scanner is needed for two cases:

  1. When the refresh server started for the first time or restarted after crashing. The watcher would not inform the refresh server of the list of secrets to support if those secrets were added before the refresh server started.

  2. When the watcher connection to the K8s API server was not stable intermittently and the refresh server was missing some events because of that. The periodic scanning would eventually discover new secrets added/deleted during the bad period.

I'll update the PR description as well.

@liyinan926
Copy link
Member

@kimoonkim Thanks for the clarification. Makes sense to me.

@kimoonkim
Copy link
Member Author

@ifilonenko and I talked last Thu in a meeting. Two improvements were suggested:

  1. Currently, the refresh server starts by issuing DT renew RPCs for new DTs. This is done only for finding DT expire times. We should encode those expire times in secrets as annotations so we can save unnecessary NN RPCs.

  2. People would want to know status of DTs that the refresh server is maintaining in terms of refreshing. The refresh server should have a REST API for showing status.

@kimoonkim
Copy link
Member Author

But it's a fine thing to do for the token refresh server, which is an independent service app and has little to do with Spark. So I think it's best for the token refresh server to live outside the Spark repo. Or, if it's too much work, keep it as an independent project with its own pom.xml with no dependencies to/from Spark.

The refresh server code now exists as an independent project with its own pom.xml. This is to avoid including akka into the spark distribution.

@kimoonkim
Copy link
Member Author

Now the refresh server has a Dockerfile and a k8s deployment yaml file. I successfully launched a pod.

There is a Kerberos login issue. Debugging it now.

@kimoonkim
Copy link
Member Author

The logic issue was caused because the Dockerfile did not put the hadoop conf dir in the classpath. Fixed in the latest commit. And confirmed the refresh server works as expected. I'll now address other issues.

@kimoonkim
Copy link
Member Author

Thanks for the review, @liyinan926. Addressed the comments. PTAL.

Copy link
Member

@liyinan926 liyinan926 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor comments. Generally LGTM. Thanks!

@@ -69,7 +69,12 @@ private class TokenRefreshService(kubernetesClient: KubernetesClient) extends Ac
scheduler.scheduleOnce(Duration(0L, TimeUnit.MILLISECONDS), task)
}

private def addStarterTask(secret: Secret) = {
private def startRefresh(secret: Secret) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startRefreshTask.


private def maybeObtainNewToken(token: Token[_ <: TokenIdentifier], expireTime: Long,
nowMills: Long) = {
val maybeNewToken = if (token.getKind.equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

@ifilonenko ifilonenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overall nearing completion. I have included my comments below. However, IMO considerable unit and integration testing seems to be necessary.


Below is a list of the submodules for this cluster manager and what they do.

* `core`: Implementation of the Kubernetes cluster manager support.
* `token-refresh-server`: Extra Kubernetes service that refreshes Hadoop
tokens for long-running Spark jobs accessing secure data sources like
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accessing/that access

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

mkdir -p /opt/token-refresh-server/jars && \
mkdir -p /opt/token-refresh-server/work-dir

ADD target/token-refresh-server-kubernetes_2.11-2.2.0-k8s-0.3.0-SNAPSHOT-assembly.tar.gz \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecated version, update to newest k8s assembly jar

action match {
case Action.ADDED =>
logInfo(s"Found ${secret.getMetadata.getSelfLink} added")
renewService ! StartRefresh(secret)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Filtering shouldn't be done on receive for it will block.

FilterWatchListDeletable[Secret, SecretList, lang.Boolean, Watch, Watcher[Secret]] = {
kubernetesClient
.secrets()
.inAnyNamespace()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it was specified before, inAnyNamespace() should be customized per use-case. I can send a PR upwards for this if you don't have the cycles for adding that feature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. PTAL if the fix is what you expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit addresses this, by switching between inAnyNamespace() and inNamespace() depending on a config key.

I have a question, though. When it uses inNamespace(), do we want the namespace value to be the current namespace that the refresh server is running in? If that's what most people want, then we should support it, I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess. Might be easier to just pass in the name of the namespace. I guess three options:
anyNamespace(), inNamespace(), withNamespace(config.namespace)


package object constants {

val REFRESH_SERVER_KERBEROS_PRINCIPAL = "kimoonkim"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting principal here ;) is this for testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, only for testing. We should make this configurable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

val hadoopTokenPattern = Pattern.compile(SECRET_DATA_ITEM_KEY_REGEX_HADOOP_TOKENS)

def apply(system: ActorSystem, kubernetesClient: KubernetesClient) : ActorRef = {
UserGroupInformation.loginUserFromKeytab(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this being used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loginUsersFromKeytab sets the login user of the application as a static singleton field. So it gets used later as the proxy user when a new job user token is obtained.

nowMillis: Long) : Unit = {
val durationUntilExpire = tokenToExpire.values.min - nowMillis
val key = s"$SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS$nowMillis-$durationUntilExpire"
val credentials = new Credentials()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be blank? or should this take in the users HadoopConf from his hdfs-site.xml

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Credentials constructor take in a hadoop config object.

realUser.toString
}
val credentials = new Credentials()
val ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a proxy user instead of just using UGI.getLoginUser?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refresh server is designed to serve multiple Spark jobs. These jobs may come from many different users. The refresh server has its own headless Kerberos account as the login user, which is UGI.getLoginUser. On the other hand, the HDFS tokens it creates belong to the job user accounts because the job's driver and executors should use them to access the job user data. The proxy user allows the refresh server to use its own account to obtain job user tokens. If it uses the login user, the obtained tokens would not be able to access the job user data.

kubectl create secret generic hadoop-token-refresh-server-kerberos-keytab \
--from-file /mnt/secrets/krb5.keytab

5. Create a k8s `service account` and `clusterrolebinding` that the service pod will use.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note as optional

@kimoonkim
Copy link
Member Author

Thanks for the review @ifilonenko. Yes, I plan to add unit tests before this gets merged in. But I hope we can delay integration tests until the other PR is merged in so that we can verify end-to-end in the integration test.

@kimoonkim
Copy link
Member Author

I have been adding unit tests. I am getting there, but still need to add tests for renew tasks.

@kimoonkim
Copy link
Member Author

Reminder to myself. I found an edge case that the current code does not handle. If a refresh server was down for a while, like a few days, and brought back up, the namenode may have removed some expired tokens in the mean time. It will return the following error when the refresh server tries to renew the token in the orphaned secret. We should think about how to fix this:

7/10/12 12:00:12 WARN RenewTask: Renewal request for unknown token
at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:502)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6684)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:570)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

org.apache.hadoop.security.token.SecretManager$InvalidToken: Renewal request for unknown token
at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:502)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6684)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:570)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:1121)
at org.apache.hadoop.security.token.Token.renew(Token.java:377)
at org.apache.spark.security.kubernetes.RenewTask.maybeGetNewExpireTime(TokenRefreshService.scala:283)
at org.apache.spark.security.kubernetes.RenewTask.org$apache$spark$security$kubernetes$RenewTask$$refresh(TokenRefreshService.scala:248)
at org.apache.spark.security.kubernetes.RenewTask$$anonfun$4.apply(TokenRefreshService.scala:226)
at org.apache.spark.security.kubernetes.RenewTask$$anonfun$4.apply(TokenRefreshService.scala:224)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.security.kubernetes.RenewTask.run(TokenRefreshService.scala:224)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Renewal request for unknown token
at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:502)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6684)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:570)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1470)
at org.apache.hadoop.ipc.Client.call(Client.java:1401)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy28.renewDelegationToken(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewDelegationToken(ClientNamenodeProtocolTranslatorPB.java:924)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy29.renewDelegationToken(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:1119)
... 16 more

@kimoonkim
Copy link
Member Author

I added unit tests for all major components and their common case code paths. (Error case code paths are not covered yet) PTAL.

@ifilonenko
Copy link
Member

Will be doing another pass for unit tests, thank you for this!

@kimoonkim
Copy link
Member Author

Yes, please take a look at unit tests. Also, please note that unit tests are not run yet by CI. I'll see what I can do about that.

@kimoonkim
Copy link
Member Author

Running unit tests from the top like below fails. Seems like dependency issues. I need to investigate:

./build/mvn clean test -Pmesos -Pyarn -Phadoop-2.7 -Pkubernetes -Pkubernetes-hdfs-extra -pl core,resource-managers/kubernetes/core,resource-managers/kubernetes/token-refresh-server -am -Dtest=none '-Dsuffixes=^org.apache.spark.(?!SortShuffleSuite$|rdd.LocalCheckpointSuite$|deploy.SparkSubmitSuite$|deploy.StandaloneDynamicAllocationSuite$).*'

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants