Skip to content

Commit

Permalink
Annotate StrimziPodSets before rolling during CA key replacement (#10876
Browse files Browse the repository at this point in the history
)

Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
  • Loading branch information
katheris authored Nov 25, 2024
1 parent 1b275e5 commit 6ef088d
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Allow using custom certificates for communication with container registry in Kafka Connect Build with Kaniko
* Only roll pods once for ClientsCa cert renewal.
This change also means the restart reason ClientCaCertKeyReplaced is removed and either CaCertRenewed or CaCertHasOldGeneration will be used.
* Allow rolling update for new cluster CA trust (during Cluster CA key replacement) to continue where it left off before interruption without rolling all pods again.

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet(
.build();
}

/**
* Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined
* in the PodSet
*
* @param strimziPodSet Strimzi PodSet to patch
* @param annotationsToBeUpdated Annotations to merge with the existing annotations
*
* @return Patched PodSet
*/
public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map<String, String> annotationsToBeUpdated) {
List<Map<String, Object>> newPods = PodSetUtils.podSetToPods(strimziPodSet)
.stream()
.map(pod -> {
Map<String, String> updatedAnnotations = pod.getMetadata().getAnnotations();
updatedAnnotations.putAll(annotationsToBeUpdated);
return pod.edit()
.editMetadata()
.withAnnotations(updatedAnnotations)
.endMetadata()
.build();
})
.map(PodSetUtils::podToMap)
.toList();
return new StrimziPodSetBuilder(strimziPodSet)
.editSpec()
.withPods(newPods)
.endSpec()
.build();
}

/**
* Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and
* typically uses storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
Expand All @@ -24,6 +23,7 @@
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.WorkloadUtils;
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -54,11 +54,11 @@

import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Class used for reconciliation of Cluster and Client CAs. This class contains both the steps of the CA reconciliation
Expand Down Expand Up @@ -234,8 +234,6 @@ Future<Void> reconcileCas(Clock clock) {
Secret clusterCaKeySecret = null;
Secret clientsCaCertSecret = null;
Secret clientsCaKeySecret = null;
List<HasMetadata> clusterCaSecrets = new ArrayList<>();
List<HasMetadata> clientsCaSecrets = new ArrayList<>();

for (Secret secret : clusterSecrets) {
String secretName = secret.getMetadata().getName();
Expand All @@ -247,13 +245,6 @@ Future<Void> reconcileCas(Clock clock) {
clientsCaCertSecret = secret;
} else if (secretName.equals(clientsCaKeyName)) {
clientsCaKeySecret = secret;
} else if (secretName.equals(KafkaResources.kafkaSecretName(reconciliation.name()))) {
clusterCaSecrets.add(secret);
clientsCaSecrets.add(secret);
} else if (secretName.equals(KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()))) {
// The CO certificate is excluded as it is renewed in a separate cycle
} else {
clusterCaSecrets.add(secret);
}
}

Expand All @@ -266,7 +257,6 @@ Future<Void> reconcileCas(Clock clock) {
reconciliation.namespace(), caLabels,
clusterCaCertLabels, clusterCaCertAnnotations,
clusterCaConfig != null && !clusterCaConfig.isGenerateSecretOwnerReference() ? null : ownerRef,
clusterCaSecrets,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

clientsCa = new ClientsCa(reconciliation, certManager,
Expand All @@ -280,7 +270,6 @@ Future<Void> reconcileCas(Clock clock) {
clientsCa.createRenewOrReplace(reconciliation.namespace(),
caLabels, Map.of(), Map.of(),
clientsCaConfig != null && !clientsCaConfig.isGenerateSecretOwnerReference() ? null : ownerRef,
clientsCaSecrets,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

return null;
Expand Down Expand Up @@ -368,7 +357,7 @@ Future<Void> maybeRollingUpdateForNewClusterCaKey() {
TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret));
return getZooKeeperReplicas()
.compose(replicas -> rollZookeeper(replicas, restartReason, coTlsPemIdentity))
.compose(i -> getKafkaReplicas())
.compose(i -> patchClusterCaKeyGenerationAndReturnNodes())
.compose(nodes -> rollKafkaBrokers(nodes, RestartReasons.of(restartReason), coTlsPemIdentity))
.compose(i -> rollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), restartReason))
.compose(i -> rollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), restartReason))
Expand Down Expand Up @@ -494,30 +483,46 @@ Future<Void> maybeRollingUpdateForNewClusterCaKey() {
.maybeRollingUpdate(reconciliation, replicas, zkSelectorLabels, rollZkPodAndLogReason, coTlsPemIdentity);
}

/* test */ Future<Set<NodeRef>> getKafkaReplicas() {
/**
* Patches the Kafka StrimziPodSets to update the Cluster CA key generation annotation and returns the nodes.
*
* @return Future containing the set of Kafka nodes which completes when the StrimziPodSets have been patched.
*/
/* test */ Future<Set<NodeRef>> patchClusterCaKeyGenerationAndReturnNodes() {
Labels selectorLabels = Labels.EMPTY
.withStrimziKind(reconciliation.kind())
.withStrimziCluster(reconciliation.name())
.withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name()));

return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels)
.compose(podSets -> {
Set<NodeRef> nodes = new LinkedHashSet<>();

if (podSets != null) {
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
List<StrimziPodSet> updatedPodSets = podSets
.stream()
.map(podSet -> WorkloadUtils.patchAnnotations(
podSet,
Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()))
)).toList();
return strimziPodSetOperator.batchReconcile(reconciliation, reconciliation.namespace(), updatedPodSets, selectorLabels)
.map(i -> updatedPodSets.stream().flatMap(podSet -> ReconcilerUtils.nodesFromPodSet(podSet).stream())
.collect(Collectors.toSet()));
} else {
return Future.succeededFuture(Set.of());
}

return Future.succeededFuture(nodes);
});
}

/* test */ Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
return createKafkaRoller(nodes, coTlsPemIdentity).rollingRestart(pod -> {
LOGGER.debugCr(reconciliation, "Rolling Pod {} due to {}", pod.getMetadata().getName(), podRollReasons.getReasons());
return podRollReasons;
int clusterCaKeyGeneration = clusterCa.caKeyGeneration();
int podClusterCaKeyGeneration = Annotations.intAnnotation(pod, Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, clusterCaKeyGeneration);
if (clusterCaKeyGeneration == podClusterCaKeyGeneration) {
LOGGER.debugCr(reconciliation, "Not rolling Pod {} since the Cluster CA cert key generation is correct.", pod.getMetadata().getName());
return RestartReasons.empty();
} else {
LOGGER.debugCr(reconciliation, "Rolling Pod {} due to {}", pod.getMetadata().getName(), podRollReasons.getReasons());
return podRollReasons;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.Instant;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyMap;
Expand All @@ -41,7 +40,7 @@ public void testRemoveExpiredCertificate() {

ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, null, null);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(3));

// force key replacement so certificate renewal ...
Expand All @@ -56,7 +55,7 @@ public void testRemoveExpiredCertificate() {

clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, clusterCa.caCertSecret(), caKeySecretWithReplaceAnno);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(4));
assertThat(clusterCa.caCertSecret().getData().containsKey("ca-2023-03-23T09-00-00Z.crt"), is(true));

Expand All @@ -66,7 +65,7 @@ public void testRemoveExpiredCertificate() {

clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), cluster, clusterCa.caCertSecret(), clusterCa.caKeySecret());
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(3));
assertThat(clusterCa.caCertSecret().getData().containsKey("ca-2023-03-23T09-00-00Z.crt"), is(false));
}
Expand All @@ -79,7 +78,7 @@ public void testIsExpiringCertificate() {

ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, null, null);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);

// check certificate expiration out of the renewal period, certificate is not expiring
instantExpected = "2023-02-15T09:00:00Z";
Expand All @@ -102,7 +101,7 @@ public void testRemoveOldCertificate() {

ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, null, null);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(3));

// force key replacement so certificate renewal ...
Expand All @@ -117,7 +116,7 @@ public void testRemoveOldCertificate() {

clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, clusterCa.caCertSecret(), caKeySecretWithReplaceAnno);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(4));
assertThat(clusterCa.caCertSecret().getData().containsKey("ca-2023-03-23T09-00-00Z.crt"), is(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ private void checkHeadlessService(Service headless) {

private Secret generateBrokerSecret(Set<String> externalBootstrapAddress, Map<Integer, Set<String>> externalAddresses) {
ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), CLUSTER, null, null);
clusterCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, List.of(), true);
clusterCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, true);
ClientsCa clientsCa = new ClientsCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), null, null, null, null, 365, 30, true, CertificateExpirationPolicy.RENEW_CERTIFICATE);
clientsCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, List.of(), true);
clientsCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, true);

return KC.generateCertificatesSecret(clusterCa, clientsCa, null, externalBootstrapAddress, externalAddresses, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ private void checkHeadlessService(Service headless) {

private Secret generateBrokerSecret(Set<String> externalBootstrapAddress, Map<Integer, Set<String>> externalAddresses) {
ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), CLUSTER, null, null);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, true);
ClientsCa clientsCa = new ClientsCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), null, null, null, null, 365, 30, true, CertificateExpirationPolicy.RENEW_CERTIFICATE);
clientsCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clientsCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, true);

return KC.generateCertificatesSecret(clusterCa, clientsCa, null, externalBootstrapAddress, externalAddresses, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -363,6 +364,44 @@ public void testCreateStrimziPodSetFromNodeReferencesWithTemplate() {
assertThat(sps.getSpec().getPods().stream().map(pod -> PodSetUtils.mapToPod(pod).getMetadata().getName()).toList(), hasItems("my-cluster-nodes-10", "my-cluster-nodes-11", "my-cluster-nodes-12"));
}

@Test
public void testPatchPodAnnotations() {
Map<String, String> annotations = Map.of("anno-1", "value-1", "anno-2", "value-2", "anno-3", "value-3");
List<Pod> pods = new ArrayList<>();
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-0")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-1")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);

StrimziPodSet sps = new StrimziPodSetBuilder()
.withNewMetadata()
.withName("my-sps")
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withPods(PodSetUtils.podsToMaps(pods))
.endSpec()
.build();

List<Pod> resultPods = PodSetUtils.podSetToPods(WorkloadUtils.patchAnnotations(sps, Map.of("anno-2", "value-2a", "anno-4", "value-4")));
assertThat(resultPods.size(), is(2));
Map<String, String> expectedAnnotations = Map.of("anno-1", "value-1", "anno-2", "value-2a", "anno-3", "value-3", "anno-4", "value-4");
assertThat(resultPods.get(0).getMetadata().getAnnotations(), is(expectedAnnotations));
assertThat(resultPods.get(1).getMetadata().getAnnotations(), is(expectedAnnotations));
}

//////////////////////////////////////////////////
// Stateful Pod tests
//////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void checkHeadlessService(Service headless) {

private Secret generateCertificatesSecret() {
ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), CLUSTER, null, null);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, true);

return ZC.generateCertificatesSecret(clusterCa, null, true);
}
Expand Down
Loading

0 comments on commit 6ef088d

Please sign in to comment.