From 2511b94074e779b4e3de2e890d5410c4281407b7 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Mon, 4 Nov 2024 01:05:54 -0300 Subject: [PATCH] feat: reverse home to edge mirroring (#4231) --- Cargo.lock | 13 +- Cargo.toml | 12 +- .../fluvio-cli/src/client/topic/add_mirror.rs | 4 + crates/fluvio-cli/src/client/topic/create.rs | 19 +- .../fluvio-controlplane-metadata/Cargo.toml | 3 +- .../fluvio-controlplane-metadata/src/lib.rs | 5 + .../src/mirror/k8.rs | 2 +- .../src/mirror/status.rs | 5 +- .../src/partition/k8.rs | 3 +- .../src/partition/spec.rs | 57 +++- .../src/topic/k8.rs | 34 ++- .../src/topic/spec.rs | 227 ++++++++++++++-- .../src/topic/update.rs | 2 + .../tests/k8_topic_mirror_down_v1.json | 32 +++ .../tests/k8_topic_mirror_down_v2.json | 8 +- crates/fluvio-protocol/Cargo.toml | 2 +- crates/fluvio-protocol/src/link/error_code.rs | 3 + crates/fluvio-sc-schema/Cargo.toml | 2 +- crates/fluvio-sc-schema/src/objects/mod.rs | 2 +- .../src/controllers/mirroring/controller.rs | 17 +- .../src/controllers/topics/policy.rs | 1 + crates/fluvio-sc/src/services/auth/basic.rs | 4 +- .../services/public_api/mirroring/connect.rs | 2 +- .../public_api/topic/update/add_mirror.rs | 1 + crates/fluvio-smartmodule/Cargo.toml | 2 +- crates/fluvio-spu-schema/Cargo.toml | 2 +- .../src/control_plane/dispatcher.rs | 4 +- crates/fluvio-spu/src/core/global_context.rs | 4 + .../fluvio-spu/src/mirroring/home/api_key.rs | 1 + .../src/mirroring/home/connection.rs | 253 +++++++++++++++++- .../fluvio-spu/src/mirroring/home/home_api.rs | 7 + crates/fluvio-spu/src/mirroring/home/mod.rs | 1 + crates/fluvio-spu/src/mirroring/home/sync.rs | 67 +++++ .../src/mirroring/home/update_offsets.rs | 2 +- crates/fluvio-spu/src/mirroring/mod.rs | 2 +- .../src/mirroring/remote/api_key.rs | 1 + .../src/mirroring/remote/controller.rs | 210 ++++++++++++--- crates/fluvio-spu/src/mirroring/remote/mod.rs | 1 + .../src/mirroring/remote/remote_api.rs | 17 +- .../fluvio-spu/src/mirroring/remote/sync.rs | 6 +- .../src/mirroring/remote/update_offsets.rs | 33 +++ .../fluvio-spu/src/mirroring/test/fixture.rs | 56 ++-- .../src/mirroring/test/integration.rs | 203 ++++++++++++-- .../src/replication/leader/leaders_state.rs | 7 +- .../src/services/public/produce_handler.rs | 9 +- crates/fluvio/Cargo.toml | 2 +- k8-util/helm/fluvio-sys/Chart.yaml | 2 +- .../fluvio-sys/templates/crd_partition.yaml | 4 + .../helm/fluvio-sys/templates/crd_topic.yaml | 30 ++- .../e2e/fluvio-core.bats | 93 ++++++- .../mirror-topic-reverse.bats | 91 +++++++ 51 files changed, 1380 insertions(+), 190 deletions(-) create mode 100644 crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v1.json create mode 100644 crates/fluvio-spu/src/mirroring/home/sync.rs create mode 100644 crates/fluvio-spu/src/mirroring/remote/update_offsets.rs create mode 100644 tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats diff --git a/Cargo.lock b/Cargo.lock index 4051673783..ed705adf7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2372,7 +2372,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.23.4" +version = "0.24.0" dependencies = [ "anyhow", "async-channel 1.9.0", @@ -2724,12 +2724,13 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.29.1" +version = "0.30.0" dependencies = [ "anyhow", "base64 0.22.1", "bytes", "bytesize", + "cfg-if", "derive_builder", "flate2", "fluvio-protocol", @@ -2911,7 +2912,7 @@ dependencies = [ [[package]] name = "fluvio-protocol" -version = "0.11.0" +version = "0.12.0" dependencies = [ "bytes", "cfg-if", @@ -2995,7 +2996,7 @@ dependencies = [ [[package]] name = "fluvio-sc-schema" -version = "0.24.3" +version = "0.25.0" dependencies = [ "anyhow", "fluvio-controlplane-metadata", @@ -3048,7 +3049,7 @@ dependencies = [ [[package]] name = "fluvio-smartmodule" -version = "0.7.5" +version = "0.8.0" dependencies = [ "criterion", "eyre", @@ -3141,7 +3142,7 @@ dependencies = [ [[package]] name = "fluvio-spu-schema" -version = "0.16.1" +version = "0.17.0" dependencies = [ "bytes", "derive_builder", diff --git a/Cargo.toml b/Cargo.toml index 5fc534f96e..6b5830b726 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,24 +166,24 @@ k8-diff = { version = "0.1.2" } trybuild = { branch = "check_option", git = "https://github.com/infinyon/trybuild" } # Internal fluvio dependencies -fluvio = { version = "0.23.3", path = "crates/fluvio" } +fluvio = { version = "0.24.0", path = "crates/fluvio" } fluvio-auth = { path = "crates/fluvio-auth" } fluvio-channel = { path = "crates/fluvio-channel" } fluvio-cli-common = { path = "crates/fluvio-cli-common"} fluvio-compression = { version = "0.3.2", path = "crates/fluvio-compression", default-features = false } fluvio-connector-package = { path = "crates/fluvio-connector-package/" } fluvio-controlplane = { path = "crates/fluvio-controlplane" } -fluvio-controlplane-metadata = { version = "0.29.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } +fluvio-controlplane-metadata = { version = "0.30.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false } fluvio-hub-util = { path = "crates/fluvio-hub-util" } fluvio-package-index = { version = "0.7.6", path = "crates/fluvio-package-index", default-features = false } -fluvio-protocol = { version = "0.11.0", path = "crates/fluvio-protocol" } -fluvio-sc-schema = { version = "0.24.0", path = "crates/fluvio-sc-schema", default-features = false } +fluvio-protocol = { version = "0.12.0", path = "crates/fluvio-protocol" } +fluvio-sc-schema = { version = "0.25.0", path = "crates/fluvio-sc-schema", default-features = false } fluvio-service = { path = "crates/fluvio-service" } fluvio-smartengine = { version = "0.8.0", path = "crates/fluvio-smartengine", default-features = false } -fluvio-smartmodule = { version = "0.7.4", path = "crates/fluvio-smartmodule", default-features = false } +fluvio-smartmodule = { version = "0.8.0", path = "crates/fluvio-smartmodule", default-features = false } fluvio-socket = { version = "0.14.9", path = "crates/fluvio-socket", default-features = false } -fluvio-spu-schema = { version = "0.16.0", path = "crates/fluvio-spu-schema", default-features = false } +fluvio-spu-schema = { version = "0.17.0", path = "crates/fluvio-spu-schema", default-features = false } fluvio-storage = { path = "crates/fluvio-storage" } fluvio-stream-dispatcher = { version = "0.13.2", path = "crates/fluvio-stream-dispatcher" } fluvio-stream-model = { version = "0.11.3", path = "crates/fluvio-stream-model", default-features = false } diff --git a/crates/fluvio-cli/src/client/topic/add_mirror.rs b/crates/fluvio-cli/src/client/topic/add_mirror.rs index 73611d9241..d89326eea0 100644 --- a/crates/fluvio-cli/src/client/topic/add_mirror.rs +++ b/crates/fluvio-cli/src/client/topic/add_mirror.rs @@ -16,6 +16,9 @@ pub struct AddMirrorOpt { topic: String, /// Remote cluster to add remote: String, + /// if set, it will mirror from home to remote + #[arg(long)] + home_to_remote: bool, } impl AddMirrorOpt { @@ -24,6 +27,7 @@ impl AddMirrorOpt { let request = AddMirror { remote_cluster: self.remote.clone(), + home_to_mirror: self.home_to_remote, }; let action = UpdateTopicAction::AddMirror(request); diff --git a/crates/fluvio-cli/src/client/topic/create.rs b/crates/fluvio-cli/src/client/topic/create.rs index 85b4d2e7fa..6a1dbb326d 100644 --- a/crates/fluvio-cli/src/client/topic/create.rs +++ b/crates/fluvio-cli/src/client/topic/create.rs @@ -130,6 +130,10 @@ pub struct CreateTopicOpt { /// or inside the topic configuration file in YAML format. #[arg(short, long, value_name = "PATH", conflicts_with = "config-arg")] config: Option, + + /// signify that this topic can be mirror from home to edge + #[arg(long)] + home_to_remote: bool, } impl CreateTopicOpt { @@ -163,7 +167,10 @@ impl CreateTopicOpt { &topic_name, )?) } else if let Some(mirror_assign_file) = &self.mirror_apply { - let config = MirrorConfig::read_from_json_file(mirror_assign_file, &topic_name)?; + let mut config = MirrorConfig::read_from_json_file(mirror_assign_file, &topic_name)?; + + config.set_home_to_remote(self.home_to_remote)?; + let targets = match config { MirrorConfig::Home(ref c) => c .partitions() @@ -199,7 +206,9 @@ impl CreateTopicOpt { ReplicaSpec::Mirror(config) } else if self.mirror { - let mirror_map = MirrorConfig::Home(HomeMirrorConfig::from(vec![])); + let mut home_mirror = HomeMirrorConfig::from(vec![]); + home_mirror.source = self.home_to_remote; + let mirror_map = MirrorConfig::Home(home_mirror); ReplicaSpec::Mirror(mirror_map) } else { ReplicaSpec::Computed(TopicReplicaParam { @@ -349,14 +358,16 @@ mod load { partitions[0], HomePartitionConfig { remote_cluster: "boat1".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() } ); assert_eq!( partitions[1], HomePartitionConfig { remote_cluster: "boat2".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() } ); } diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index 3cd4e3549f..4ec1e2903e 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.29.1" +version = "0.30.0" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" @@ -17,6 +17,7 @@ use_serde = ["serde","semver/serde", "bytesize/serde", "humantime-serde", "serde k8 = ["use_serde", "fluvio-stream-model/k8"] [dependencies] +cfg-if = { workspace = true } thiserror = { workspace = true } base64 = { workspace = true } bytes = { workspace = true } diff --git a/crates/fluvio-controlplane-metadata/src/lib.rs b/crates/fluvio-controlplane-metadata/src/lib.rs index b6fd323cfa..b0ad920946 100644 --- a/crates/fluvio-controlplane-metadata/src/lib.rs +++ b/crates/fluvio-controlplane-metadata/src/lib.rs @@ -14,6 +14,11 @@ pub mod store { pub use fluvio_stream_model::store::*; } +#[cfg(feature = "use_serde")] +pub(crate) fn is_false(b: &bool) -> bool { + !b +} + #[cfg(feature = "k8")] pub use fluvio_stream_model::k8_types; diff --git a/crates/fluvio-controlplane-metadata/src/mirror/k8.rs b/crates/fluvio-controlplane-metadata/src/mirror/k8.rs index 316fac82a0..ae276258a6 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/k8.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/k8.rs @@ -35,7 +35,7 @@ mod test_v1_spec { type K8RemoteSpec = K8Obj; #[test] - fn read_k8_mirror_json() { + fn read_k8_mirror_json_v1() { let reader: BufReader = BufReader::new(File::open("tests/k8_mirror_v1.json").expect("spec")); let cluster: K8RemoteSpec = serde_json::from_reader(reader).expect("failed to parse topic"); diff --git a/crates/fluvio-controlplane-metadata/src/mirror/status.rs b/crates/fluvio-controlplane-metadata/src/mirror/status.rs index 8cc4c7a176..e609621244 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/status.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/status.rs @@ -1,4 +1,3 @@ -use std::time::Duration; use fluvio_protocol::{Encoder, Decoder}; #[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)] @@ -101,7 +100,7 @@ pub struct ConnectionStat { impl MirrorStatus { #[cfg(feature = "use_serde")] - pub fn last_seen(&self, since: Duration) -> String { + pub fn last_seen(&self, since: std::time::Duration) -> String { use humantime_serde::re::humantime; let since_sec = since.as_secs(); @@ -153,6 +152,8 @@ impl std::fmt::Display for ConnectionStatus { #[cfg(test)] mod test { + use std::time::Duration; + use super::*; #[test] diff --git a/crates/fluvio-controlplane-metadata/src/partition/k8.rs b/crates/fluvio-controlplane-metadata/src/partition/k8.rs index d0f505773a..10b3f660ec 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/k8.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/k8.rs @@ -50,7 +50,8 @@ mod test_spec { mirror, PartitionMirrorConfig::Home(HomePartitionConfig { remote_cluster: "boat1".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() }) ); } diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 2281201bc2..929c3cca63 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -1,11 +1,7 @@ #![allow(clippy::assign_op_pattern)] -//! -//! # Partition Spec -//! -//! use fluvio_types::SpuId; -use fluvio_protocol::{Encoder, Decoder}; +use fluvio_protocol::{link::ErrorCode, Decoder, Encoder}; use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig}; @@ -82,7 +78,24 @@ impl PartitionSpec { pub fn mirror_string(&self) -> String { if let Some(mirror) = &self.mirror { - mirror.external_cluster() + let external = mirror.external_cluster(); + match mirror { + PartitionMirrorConfig::Remote(remote) => { + if remote.target { + format!("{}(to-home)", external) + } else { + format!("{}(from-home)", external) + } + } + + PartitionMirrorConfig::Home(home) => { + if home.source { + format!("{}(from-remote)", external) + } else { + format!("{}(to-remote)", external) + } + } + } } else { "".to_owned() } @@ -149,9 +162,30 @@ impl PartitionMirrorConfig { } } + #[deprecated(since = "0.29.1")] pub fn is_home_mirror(&self) -> bool { matches!(self, Self::Home(_)) } + + /// check whether this mirror should accept traffic + pub fn accept_traffic(&self) -> Option { + match self { + Self::Remote(r) => { + if r.target { + Some(ErrorCode::MirrorProduceFromRemoteNotAllowed) + } else { + None + } + } + Self::Home(h) => { + if h.source { + None + } else { + Some(ErrorCode::MirrorProduceFromHome) + } + } + } + } } impl std::fmt::Display for PartitionMirrorConfig { @@ -172,6 +206,12 @@ impl std::fmt::Display for PartitionMirrorConfig { pub struct HomePartitionConfig { pub remote_cluster: String, pub remote_replica: String, + // if this is set, home will be mirror instead of + #[cfg_attr( + feature = "use_serde", + serde(default, skip_serializing_if = "crate::is_false") + )] + pub source: bool, } impl std::fmt::Display for HomePartitionConfig { @@ -192,6 +232,11 @@ pub struct RemotePartitionConfig { #[cfg_attr(feature = "use_serde", serde(default))] pub home_spu_id: SpuId, pub home_spu_endpoint: String, + #[cfg_attr( + feature = "use_serde", + serde(default, skip_serializing_if = "crate::is_false") + )] + pub target: bool, } impl std::fmt::Display for RemotePartitionConfig { diff --git a/crates/fluvio-controlplane-metadata/src/topic/k8.rs b/crates/fluvio-controlplane-metadata/src/topic/k8.rs index ea5ad491d5..ea3c1b02e8 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/k8.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/k8.rs @@ -50,7 +50,33 @@ mod test_spec { } #[test] - fn read_k8_topic_partition_mirror_json() { + fn read_k8_topic_partition_mirror_json_v1() { + let reader: BufReader = + BufReader::new(File::open("tests/k8_topic_mirror_down_v1.json").expect("spec")); + let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic"); + assert_eq!(topic.metadata.name, "downstream-topic"); + assert_eq!( + topic.spec.replicas().to_owned(), + ReplicaSpec::Mirror(MirrorConfig::Home( + vec![ + HomePartitionConfig { + remote_cluster: "boat1".to_string(), + remote_replica: "boats-0".to_string(), + ..Default::default() + }, + HomePartitionConfig { + remote_cluster: "boat2".to_string(), + remote_replica: "boats-0".to_string(), + ..Default::default() + } + ] + .into() + )) + ); + } + + #[test] + fn read_k8_topic_partition_mirror_json_v2() { let reader: BufReader = BufReader::new(File::open("tests/k8_topic_mirror_down_v2.json").expect("spec")); let topic: K8TopicSpec = serde_json::from_reader(reader).expect("failed to parse topic"); @@ -61,11 +87,13 @@ mod test_spec { vec![ HomePartitionConfig { remote_cluster: "boat1".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() }, HomePartitionConfig { remote_cluster: "boat2".to_string(), - remote_replica: "boats-0".to_string() + remote_replica: "boats-0".to_string(), + ..Default::default() } ] .into() diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 71e926d9cf..855998402f 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -1,4 +1,4 @@ -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; use anyhow::{anyhow, Result}; @@ -11,7 +11,7 @@ use fluvio_types::SpuId; use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment}; use fluvio_protocol::{Encoder, Decoder}; -use crate::partition::{PartitionMirrorConfig, RemotePartitionConfig, HomePartitionConfig}; +use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig}; use super::deduplication::Deduplication; @@ -275,7 +275,22 @@ impl ReplicaSpec { match self { Self::Computed(_) => "computed", Self::Assigned(_) => "assigned", - Self::Mirror(_) => "mirror", + Self::Mirror(mirror) => match mirror { + MirrorConfig::Remote(remote_config) => { + if remote_config.target { + "from-home" + } else { + "to-home" + } + } + MirrorConfig::Home(home_config) => { + if home_config.0.source { + "from-remote" + } else { + "to-remote" + } + } + }, } } @@ -645,26 +660,58 @@ impl MirrorConfig { } } + /// Set home to remote replication + pub fn set_home_to_remote(&mut self, home_to_remote: bool) -> Result<()> { + match self { + Self::Remote(_) => Err(anyhow!( + "remote mirror config cannot be set to home to remote" + )), + Self::Home(home) => { + home.set_home_to_remote(home_to_remote); + Ok(()) + } + } + } + /// Validate partition map for assigned topics pub fn validate(&self) -> anyhow::Result<()> { Ok(()) } } -#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)] +type Partitions = Vec; + +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize), + serde(rename_all = "camelCase", untagged) +)] +enum MultiHome { + V1(Partitions), + V2(HomeMirrorInner), +} + +#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)] #[cfg_attr( feature = "use_serde", derive(serde::Serialize, serde::Deserialize), serde(rename_all = "camelCase") )] pub struct HomeMirrorConfig( - #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Vec::is_empty"))] - Vec, + #[cfg_attr(feature = "use_serde", serde(deserialize_with = "from_home_v1"))] HomeMirrorInner, ); -impl From> for HomeMirrorConfig { - fn from(partitions: Vec) -> Self { - Self(partitions) +impl Deref for HomeMirrorConfig { + type Target = HomeMirrorInner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for HomeMirrorConfig { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } @@ -672,19 +719,145 @@ impl HomeMirrorConfig { /// generate home config from simple mirror cluster list /// this uses home topic to generate remote replicas pub fn from_simple(topic: &str, remote_clusters: Vec) -> Self { - Self( - remote_clusters + Self(HomeMirrorInner { + partitions: remote_clusters .into_iter() .map(|remote_cluster| HomePartitionConfig { remote_cluster, remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() }, + ..Default::default() }) .collect(), - ) + source: false, + }) } +} + +cfg_if::cfg_if! { + if #[cfg(feature = "use_serde")] { + impl<'de> serde::Deserialize<'de> for MultiHome { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct MultiHomeVisitor; + + impl<'de> serde::de::Visitor<'de> for MultiHomeVisitor { + type Value = MultiHome; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("an array or an object") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut elements = vec![]; + while let Some(value) = seq.next_element::()? { + elements.push(value); + } + Ok(MultiHome::V1(elements)) + } + + fn visit_map(self, map: M) -> Result + where + M: serde::de::MapAccess<'de>, + { + use serde::de::value::MapAccessDeserializer; + let obj: HomeMirrorInner = serde::Deserialize::deserialize(MapAccessDeserializer::new(map))?; + Ok(MultiHome::V2(obj)) + } + } + deserializer.deserialize_any(MultiHomeVisitor) + } + } + + fn from_home_v1<'de, D>(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let home: MultiHome = serde::Deserialize::deserialize(deserializer)?; + match home { + MultiHome::V1(v1) => Ok(HomeMirrorInner { + partitions: v1, + source: false, + }), + MultiHome::V2(v2) => Ok(v2), + } + } + } +} + +#[derive(Default, Debug, Clone, Eq, PartialEq)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "camelCase") +)] +pub struct HomeMirrorInner { + #[cfg_attr(feature = "use_serde", serde(default))] + pub partitions: Vec, + #[cfg_attr( + feature = "use_serde", + serde(skip_serializing_if = "crate::is_false", default) + )] + pub source: bool, // source of mirror +} + +impl Encoder for HomeMirrorInner { + fn write_size(&self, version: i16) -> usize { + if version < 18 { + self.partitions.write_size(version) + } else { + self.partitions.write_size(version) + self.source.write_size(version) + } + } + + fn encode( + &self, + dest: &mut T, + version: fluvio_protocol::Version, + ) -> std::result::Result<(), std::io::Error> + where + T: bytes::BufMut, + { + if version < 18 { + self.partitions.encode(dest, version)?; + } else { + self.partitions.encode(dest, version)?; + self.source.encode(dest, version)?; + } + Ok(()) + } +} + +impl Decoder for HomeMirrorInner { + fn decode(&mut self, src: &mut T, version: i16) -> std::result::Result<(), std::io::Error> + where + T: bytes::Buf, + { + self.partitions.decode(src, version)?; + if version >= 18 { + self.source.decode(src, version)?; + } + Ok(()) + } +} + +impl From> for HomeMirrorConfig { + fn from(partitions: Vec) -> Self { + Self(HomeMirrorInner { + partitions, + source: false, + }) + } +} + +impl HomeMirrorInner { pub fn partition_count(&self) -> PartitionCount { - self.0.len() as PartitionCount + self.partitions.len() as PartitionCount } pub fn replication_factor(&self) -> Option { @@ -692,12 +865,12 @@ impl HomeMirrorConfig { } pub fn partitions(&self) -> &Vec { - &self.0 + &self.partitions } pub fn as_partition_maps(&self) -> PartitionMaps { let mut maps = vec![]; - for (partition_id, home_partition) in self.0.iter().enumerate() { + for (partition_id, home_partition) in self.partitions.iter().enumerate() { maps.push(PartitionMap { id: partition_id as u32, mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())), @@ -714,7 +887,15 @@ impl HomeMirrorConfig { /// Add partition to home mirror config pub fn add_partition(&mut self, partition: HomePartitionConfig) { - self.0.push(partition); + self.partitions.push(partition); + } + + /// set home to remote replication + pub fn set_home_to_remote(&mut self, home_to_remote: bool) { + self.source = home_to_remote; + self.partitions.iter_mut().for_each(|partition| { + partition.source = home_to_remote; + }); } } @@ -735,8 +916,12 @@ pub struct HomeMirrorPartition { serde(rename_all = "camelCase") )] pub struct RemoteMirrorConfig { + // source of mirror pub home_cluster: String, pub home_spus: Vec, + #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "crate::is_false"))] + #[fluvio(min_version = 18)] + pub target: bool, } #[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)] @@ -774,6 +959,7 @@ impl RemoteMirrorConfig { home_spu_id: home_spu.id, home_cluster: self.home_cluster.clone(), home_spu_endpoint: home_spu.endpoint.clone(), + target: self.target, })), ..Default::default() }); @@ -1122,6 +1308,13 @@ mod test { let spec2 = ReplicaSpec::new_assigned(p2); assert_eq!(spec2.partition_map_str(), Some("".to_string())); } + + #[test] + fn test_deserialize_home_mirror_config() { + let data = r#"{"partitions":[{"remoteCluster":"boat1","remoteReplica":"boats-0","source":false},{"remoteCluster":"boat2","remoteReplica":"boats-0","source":false}]}"#; + let home_mirror: HomeMirrorConfig = serde_json::from_str(data).expect("deserialize"); + assert_eq!(home_mirror.partitions().len(), 2); + } } #[cfg(test)] @@ -1144,6 +1337,7 @@ mod mirror_test { mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig { remote_replica: "boats-0".to_string(), remote_cluster: "boat1".to_owned(), + ..Default::default() })), ..Default::default() }, @@ -1152,6 +1346,7 @@ mod mirror_test { mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig { remote_replica: "boats-0".to_string(), remote_cluster: "boat2".to_string(), + ..Default::default() })), replicas: vec![], }, diff --git a/crates/fluvio-controlplane-metadata/src/topic/update.rs b/crates/fluvio-controlplane-metadata/src/topic/update.rs index 832f791315..bcc04aa46c 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/update.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/update.rs @@ -8,6 +8,8 @@ pub struct AddPartition { #[derive(Debug, Default, Encoder, Decoder, Clone)] pub struct AddMirror { pub remote_cluster: String, + // if set, this is mirror home + pub home_to_mirror: bool, } #[derive(Debug, Encoder, Decoder, Clone)] diff --git a/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v1.json b/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v1.json new file mode 100644 index 0000000000..99d55a53db --- /dev/null +++ b/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v1.json @@ -0,0 +1,32 @@ +{ + "apiVersion": "fluvio.infinyon.com/v2", + "kind": "Topic", + "metadata": { + "annotations": { + "kubectl.kubernetes.io/last-applied-configuration": "{\"apiVersion\":\"fluvio.infinyon.com/v2\",\"kind\":\"Topic\",\"metadata\":{\"annotations\":{},\"name\":\"test3\",\"namespace\":\"default\"},\"spec\":{\"compressionType\":\"Any\",\"replicas\":{\"mirror\":{\"home\":{\"remoteClusters\":[\"boat1\",\"boat2\"]}}}}}\n" + }, + "creationTimestamp": "2023-08-17T17:43:09Z", + "generation": 1, + "name": "downstream-topic", + "namespace": "default", + "resourceVersion": "253907", + "uid": "1694c901-e94d-4f0b-a01f-1a09889b14fd" + }, + "spec": { + "compressionType": "Any", + "replicas": { + "mirror": { + "home": [ + { + "remoteCluster": "boat1", + "remoteReplica": "boats-0" + }, + { + "remoteCluster": "boat2", + "remoteReplica": "boats-0" + } + ] + } + } + } +} diff --git a/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v2.json b/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v2.json index 99d55a53db..c4f40defa1 100644 --- a/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v2.json +++ b/crates/fluvio-controlplane-metadata/tests/k8_topic_mirror_down_v2.json @@ -16,7 +16,8 @@ "compressionType": "Any", "replicas": { "mirror": { - "home": [ + "home": { + "partitions": [ { "remoteCluster": "boat1", "remoteReplica": "boats-0" @@ -24,8 +25,9 @@ { "remoteCluster": "boat2", "remoteReplica": "boats-0" - } - ] + }], + "source": false + } } } } diff --git a/crates/fluvio-protocol/Cargo.toml b/crates/fluvio-protocol/Cargo.toml index 2f1fb3788a..9ce68b6992 100644 --- a/crates/fluvio-protocol/Cargo.toml +++ b/crates/fluvio-protocol/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-protocol" edition = "2021" -version = "0.11.0" +version = "0.12.0" authors = ["Fluvio Contributors "] description = "Fluvio streaming protocol" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-protocol/src/link/error_code.rs b/crates/fluvio-protocol/src/link/error_code.rs index 203d499137..ff34a01b87 100644 --- a/crates/fluvio-protocol/src/link/error_code.rs +++ b/crates/fluvio-protocol/src/link/error_code.rs @@ -220,6 +220,9 @@ pub enum ErrorCode { #[fluvio(tag = 11005)] #[error("the mirror is invalid")] MirrorInvalidType, + #[fluvio(tag = 11006)] + #[error("produce from remote target is not allowed")] + MirrorProduceFromRemoteNotAllowed, // Specs #[fluvio(tag = 12001)] diff --git a/crates/fluvio-sc-schema/Cargo.toml b/crates/fluvio-sc-schema/Cargo.toml index fd694b986e..d7da80d2ca 100644 --- a/crates/fluvio-sc-schema/Cargo.toml +++ b/crates/fluvio-sc-schema/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-sc-schema" -version = "0.24.3" +version = "0.25.0" edition = "2021" authors = ["Fluvio Contributors "] description = "Fluvio API for SC" diff --git a/crates/fluvio-sc-schema/src/objects/mod.rs b/crates/fluvio-sc-schema/src/objects/mod.rs index b2f0e99953..2c0a163b38 100644 --- a/crates/fluvio-sc-schema/src/objects/mod.rs +++ b/crates/fluvio-sc-schema/src/objects/mod.rs @@ -15,7 +15,7 @@ pub use list::*; pub use watch::*; pub use metadata::*; -pub(crate) const COMMON_VERSION: i16 = 17; // from now, we use a single version for all objects +pub(crate) const COMMON_VERSION: i16 = 18; // from now, we use a single version for all objects pub(crate) const DYN_OBJ: i16 = 11; // version indicate dynamic object #[cfg(test)] diff --git a/crates/fluvio-sc/src/controllers/mirroring/controller.rs b/crates/fluvio-sc/src/controllers/mirroring/controller.rs index 91e64e1d68..c5b5ba7ca3 100644 --- a/crates/fluvio-sc/src/controllers/mirroring/controller.rs +++ b/crates/fluvio-sc/src/controllers/mirroring/controller.rs @@ -59,22 +59,26 @@ impl RemoteMirrorController { } let wait = backoff.wait(); + info!(wait = wait.as_millis(), "waiting(ms) before retry"); sleep(wait).await; } } } - #[instrument(skip(self))] + #[instrument(skip(self, backoff))] async fn inner_loop(&self, backoff: &mut ExponentialBackoff) -> Result<()> { loop { if let Some((home, _)) = self.get_mirror_home_cluster().await { - debug!("initializing listeners"); + info!(home = %home.id, "connected to home cluster"); let home_config = self.build_home_client(&home).await?; let mut stream = self.request_stream(&home, home_config).await?; + info!("created request stream"); + while let Some(response) = stream.next().await { match response { Ok(response) => { + info!("received from home"); debug!("received response: {:#?}", response); let request_topics_keys = response .topics @@ -210,6 +214,14 @@ impl RemoteMirrorController { // Sync the mirror topic async fn sync_topic(&self, home: &Home, topic: &MirroringSpecWrapper) -> Result<()> { + let home_spec = match topic.spec.replicas() { + ReplicaSpec::Mirror(MirrorConfig::Home(h)) => h, + _ => { + return Err(anyhow!("topic {} should be home mirror ", topic.key)); + } + }; + + info!(home_spec.source, "home is source"); // Create a new replica spec for the topic let new_replica: ReplicaSpec = ReplicaSpec::Mirror(MirrorConfig::Remote(RemoteMirrorConfig { @@ -222,6 +234,7 @@ impl RemoteMirrorController { 1 ], home_cluster: home.id.clone(), + target: home_spec.source, })); // Check if the topic already exists diff --git a/crates/fluvio-sc/src/controllers/topics/policy.rs b/crates/fluvio-sc/src/controllers/topics/policy.rs index 5c1b84e8aa..d027cf2bd0 100644 --- a/crates/fluvio-sc/src/controllers/topics/policy.rs +++ b/crates/fluvio-sc/src/controllers/topics/policy.rs @@ -298,6 +298,7 @@ impl TopicNextState { home_spu_key: spu.key.clone(), home_cluster: src.home_cluster.clone(), home_spu_endpoint: spu.endpoint.clone(), + target: src.target, }), ); } diff --git a/crates/fluvio-sc/src/services/auth/basic.rs b/crates/fluvio-sc/src/services/auth/basic.rs index 27ec229bca..16e42629ec 100644 --- a/crates/fluvio-sc/src/services/auth/basic.rs +++ b/crates/fluvio-sc/src/services/auth/basic.rs @@ -324,8 +324,8 @@ mod test { default_role.insert( ObjectType::Mirror, vec![ - ActionUrn::new(Action::Read, Some("edge1".to_string())), - ActionUrn::new(Action::Read, Some("edge2".to_string())), + ActionUrn::new(Action::Read, Some("remote1".to_string())), + ActionUrn::new(Action::Read, Some("remote2".to_string())), ], ); diff --git a/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs b/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs index def7f03078..101e469689 100644 --- a/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs +++ b/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs @@ -167,7 +167,7 @@ impl RemoteFetchingFromHomeController { Err(anyhow!("remote cluster not found: {}", self.req.remote_id)) } - #[instrument(skip(self))] + #[instrument(skip(self, topics_listener, spus_listener))] async fn sync_and_send_topics( &mut self, topics_listener: &mut ChangeListener, diff --git a/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs b/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs index 254e3e04a4..04d689f1ac 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/update/add_mirror.rs @@ -104,6 +104,7 @@ pub async fn handle_add_mirror( let new_home_partition_config = HomePartitionConfig { remote_cluster: request.remote_cluster, remote_replica: { ReplicaKey::new(topic.key(), 0_u32).to_string() }, + source: home_config.source, }; new_home_config.add_partition(new_home_partition_config); spec.set_replicas(ReplicaSpec::Mirror(MirrorConfig::Home(new_home_config))); diff --git a/crates/fluvio-smartmodule/Cargo.toml b/crates/fluvio-smartmodule/Cargo.toml index 67997c444e..e47d2af2d3 100644 --- a/crates/fluvio-smartmodule/Cargo.toml +++ b/crates/fluvio-smartmodule/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-smartmodule" -version = "0.7.5" +version = "0.8.0" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/crates/fluvio-spu-schema/Cargo.toml b/crates/fluvio-spu-schema/Cargo.toml index 827d5ff725..7f2d7d57ba 100644 --- a/crates/fluvio-spu-schema/Cargo.toml +++ b/crates/fluvio-spu-schema/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-spu-schema" -version = "0.16.1" +version = "0.17.0" edition = "2021" authors = ["Fluvio Contributors "] description = "Fluvio API for SPU" diff --git a/crates/fluvio-spu/src/control_plane/dispatcher.rs b/crates/fluvio-spu/src/control_plane/dispatcher.rs index ac81f7cdc8..2777157a8c 100644 --- a/crates/fluvio-spu/src/control_plane/dispatcher.rs +++ b/crates/fluvio-spu/src/control_plane/dispatcher.rs @@ -1,7 +1,5 @@ use std::time::Duration; -use fluvio_controlplane::sc_api::update_mirror::UpdateMirrorStatRequest; -use fluvio_controlplane::spu_api::update_mirror::UpdateMirrorRequest; use tracing::{info, trace, error, debug, warn, instrument}; use tokio::select; use futures_util::stream::StreamExt; @@ -19,6 +17,8 @@ use fluvio_future::timer::sleep; use fluvio_protocol::api::RequestMessage; use fluvio_socket::{FluvioSocket, FluvioSink}; use fluvio_storage::FileReplica; +use fluvio_controlplane::sc_api::update_mirror::UpdateMirrorStatRequest; +use fluvio_controlplane::spu_api::update_mirror::UpdateMirrorRequest; use crate::core::SharedGlobalContext; diff --git a/crates/fluvio-spu/src/core/global_context.rs b/crates/fluvio-spu/src/core/global_context.rs index 19a9cbd550..4ecd76c93c 100644 --- a/crates/fluvio-spu/src/core/global_context.rs +++ b/crates/fluvio-spu/src/core/global_context.rs @@ -143,6 +143,10 @@ where &self.spu_followers } + pub fn follower_notifier_owned(&self) -> Arc { + self.spu_followers.clone() + } + #[allow(unused)] pub fn status_update(&self) -> &StatusLrsMessageSink { &self.lrs_status_update diff --git a/crates/fluvio-spu/src/mirroring/home/api_key.rs b/crates/fluvio-spu/src/mirroring/home/api_key.rs index 8638886bb2..7f54b831cf 100644 --- a/crates/fluvio-spu/src/mirroring/home/api_key.rs +++ b/crates/fluvio-spu/src/mirroring/home/api_key.rs @@ -6,4 +6,5 @@ use fluvio_protocol::{Encoder, Decoder}; pub enum MirrorHomeApiEnum { #[default] UpdateHomeOffset = 0, + SyncRecords = 1, } diff --git a/crates/fluvio-spu/src/mirroring/home/connection.rs b/crates/fluvio-spu/src/mirroring/home/connection.rs index 2d30790650..afa6e537e6 100644 --- a/crates/fluvio-spu/src/mirroring/home/connection.rs +++ b/crates/fluvio-spu/src/mirroring/home/connection.rs @@ -1,56 +1,71 @@ use std::time::Duration; use std::{fmt, sync::Arc}; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use tokio::select; -use tracing::{debug, error, instrument, warn}; -use anyhow::Result; +use tracing::{debug, error, info, instrument, warn}; +use anyhow::{Result, anyhow}; +use futures_util::StreamExt; use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::mirror::{MirrorPairStatus, MirrorType}; use fluvio_controlplane_metadata::extended::ObjectType; use fluvio_future::timer::sleep; -use fluvio_protocol::api::RequestMessage; +use fluvio_protocol::{record::Offset, api::RequestMessage}; use fluvio_spu_schema::server::mirror::StartMirrorRequest; -use futures_util::StreamExt; use fluvio_socket::{ExclusiveFlvSink, FluvioStream}; +use fluvio::Isolation; use crate::control_plane::SharedMirrorStatusUpdate; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::remote::api_key::MirrorRemoteApiEnum; use crate::mirroring::remote::remote_api::RemoteMirrorRequest; -use crate::mirroring::remote::sync::DefaultPartitionSyncRequest; +use crate::mirroring::remote::sync::{DefaultRemotePartitionSyncRequest, MirrorPartitionSyncRequest}; +use crate::mirroring::remote::update_offsets::UpdateRemoteOffsetRequest; use crate::replication::leader::SharedFileLeaderState; use crate::services::auth::SpuAuthServiceContext; +use super::sync::HomeFilePartitionSyncRequest; use super::update_offsets::UpdateHomeOffsetRequest; const MIRROR_RECONCILIATION_INTERVAL_SEC: u64 = 60; // 1 min +const UNKNOWN_LEO: i64 = -1; + pub(crate) struct MirrorRequestMetrics { loop_count: AtomicU64, + remote_leo: AtomicI64, } impl MirrorRequestMetrics { pub(crate) fn new() -> Self { Self { loop_count: AtomicU64::new(0), + remote_leo: AtomicI64::new(UNKNOWN_LEO), } } fn increase_loop_count(&self) { - self.loop_count - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.loop_count.fetch_add(1, Ordering::Relaxed); } fn get_loop_count(&self) -> u64 { - self.loop_count.load(std::sync::atomic::Ordering::Relaxed) + self.loop_count.load(Ordering::Relaxed) + } + + fn get_remote_leo(&self) -> i64 { + self.remote_leo.load(Ordering::SeqCst) + } + + fn update_remote_leo(&self, leo: Offset) { + self.remote_leo.store(leo, Ordering::SeqCst); } } /// Handle mirror request from remote pub(crate) struct MirrorHomeHandler { metrics: Arc, + /// leader replicat that will be mirrored leader: SharedFileLeaderState, ctx: DefaultSharedGlobalContext, status_update: SharedMirrorStatusUpdate, @@ -113,7 +128,7 @@ impl MirrorHomeHandler { let remote_replica = req_msg.request.remote_replica; let remote_cluster_id = req_msg.request.remote_cluster_id; - if let Some(leader) = auth_ctx + if let Some((leader, source)) = auth_ctx .global_ctx .leaders_state() .find_mirror_home_leader(&remote_cluster_id, &remote_replica) @@ -131,7 +146,21 @@ impl MirrorHomeHandler { remote_cluster_id: remote_cluster_id.clone(), }; - if let Err(err) = handler.inner_respond(sink, stream).await { + if source { + if let Err(err) = handler.respond_as_source(sink, stream).await { + error!("error handling mirror request: {:#?}", err); + + if let Err(err) = mirror_status_update + .send_status( + remote_cluster_id.clone(), + MirrorPairStatus::DetailFailure(err.to_string()), + ) + .await + { + error!("error updating status: {}", err); + } + } + } else if let Err(err) = handler.respond_as_target(sink, stream).await { error!("error handling mirror request: {:#?}", err); if let Err(err) = mirror_status_update @@ -152,8 +181,8 @@ impl MirrorHomeHandler { } } - /// main respond handler - async fn inner_respond( + /// respond to mirror request from remote as target + async fn respond_as_target( self, mut sink: ExclusiveFlvSink, mut stream: FluvioStream, @@ -191,6 +220,9 @@ impl MirrorHomeHandler { RemoteMirrorRequest::SyncRecords(sync_request)=> { self.sync_record_from_remote(&mut sink,sync_request.request).await?; } + RemoteMirrorRequest::UpdateRemoteOffset(_req) => { + return Err(anyhow!("received offset request from remote, this should not happen, since we are target")); + } } } else { @@ -233,7 +265,7 @@ impl MirrorHomeHandler { async fn sync_record_from_remote( &self, sink: &mut ExclusiveFlvSink, - mut req: DefaultPartitionSyncRequest, + mut req: DefaultRemotePartitionSyncRequest, ) -> Result<()> { let append_flag = self .leader @@ -242,4 +274,197 @@ impl MirrorHomeHandler { debug!(append_flag, "leader appended"); self.send_offsets_to_remote(sink).await } + + /// respond to mirror request from remote as source + async fn respond_as_source( + self, + sink: ExclusiveFlvSink, + mut stream: FluvioStream, + ) -> Result<()> { + // first send + let mut api_stream = stream.api_stream::(); + + // if remote is behind, we need to update remote + let mut remote_updated_needed = false; + + let mut leader_offset_listener = self.leader.offset_listener(&Isolation::ReadUncommitted); + + #[allow(unused_assignments)] + loop { + let remote_leo = self.metrics.get_remote_leo(); + debug!( + counter = self.metrics.get_loop_count(), + remote_leo, remote_updated_needed, "waiting for mirror event" + ); + + // send missing records to remote if remote is behind + + if remote_updated_needed && remote_leo >= 0 { + self.send_records_to_remote(&sink, remote_leo).await?; + remote_updated_needed = false; + } + + select! { + _ = leader_offset_listener.listen() => { + info!("leader offset has changed, remote cluster needs to be updated"); + remote_updated_needed = true; + }, + + + remote_msg = api_stream.next() => { + if let Some(req_msg_res) = remote_msg { + let req_msg = req_msg_res?; + + match req_msg { + RemoteMirrorRequest::SyncRecords(_sync_request)=> { + return Err(anyhow!("received sync request from remote, this should not happen, since we are source")); + } + RemoteMirrorRequest::UpdateRemoteOffset(req) => { + remote_updated_needed = self.update_from_remote(req)?; + } + } + + } else { + self.update_status(MirrorPairStatus::DetailFailure("closed connection".to_owned())).await?; + debug!("leader socket has terminated"); + break; + } + } + } + + self.metrics.increase_loop_count(); + } + + info!("remote has closed connection, terminating"); + + Ok(()) + } + + #[instrument(skip(self, sink))] + async fn send_records_to_remote( + &self, + sink: &ExclusiveFlvSink, + remote_leo: Offset, + ) -> Result<()> { + debug!("updating home cluster"); + if let Some(sync_request) = self.generate_home_records_as_source(remote_leo).await? { + debug!(?sync_request, "home sync"); + let request = RequestMessage::new_request(sync_request) + .set_client_id(format!("leader: {}", self.leader.id())); + + let mut inner_sink = sink.lock().await; + inner_sink + .encode_file_slices(&request, request.header.api_version()) + .await?; + Ok(()) + } else { + Ok(()) + } + } + + /// home is source, generate missing records to send to \remote + async fn generate_home_records_as_source( + &self, + remote_leo: Offset, + ) -> Result> { + const MAX_BYTES: u32 = 1024 * 1024; // 1MB + + // leader off should be always greater than remote leo + let leader_offset = self.leader.as_offset(); + + // if remote mirror is all caught up, there is no need to send out update + if leader_offset.leo == remote_leo { + debug!("remote has caught up, just chilling out"); + return Ok(None); + } + + let mut partition_response = MirrorPartitionSyncRequest { + leo: leader_offset.leo, + hw: leader_offset.hw, + ..Default::default() + }; + + if leader_offset.leo > remote_leo { + match self + .leader + .read_records(remote_leo, MAX_BYTES, Isolation::ReadUncommitted) + .await + { + // leader offset is greater than home, we need to send records to home (default) + Ok(slice) => { + debug!( + hw = slice.end.hw, + leo = slice.end.leo, + replica = %self.leader.id(), + "read records" + ); + if let Some(file_slice) = slice.file_slice { + partition_response.records = file_slice.into(); + } + Ok(Some(partition_response.into())) + } + Err(err) => { + error!(%err, "error reading records"); + Err(anyhow!("error reading records: {}", err)) + } + } + } else { + // home has more records, then we sync copy records from home + debug!( + hw = leader_offset.hw, + leo = leader_offset.leo, + remote_leo, + "oh no mirror home has more records" + ); + Err(anyhow!( + "leader has more records than home, this should not happen" + )) + } + } + + // received new offset from edge, this happens when edge is behind + #[instrument(skip(req))] + fn update_from_remote(&self, req: RequestMessage) -> Result { + let leader_leo = self.leader.leo(); + let old_remote_leo = self.metrics.get_remote_leo(); + let new_remote_leo = req.request.offset().leo; + + debug!( + leader_leo, + old_remote_leo, new_remote_leo, "received update from remote" + ); + // if old home leo is not initialized, we need to update home + if old_remote_leo < 0 { + debug!(new_remote_leo, "updating remote leo from uninitialized"); + self.metrics.update_remote_leo(new_remote_leo); + } + + // check how far remote is behind + match new_remote_leo.cmp(&leader_leo) { + std::cmp::Ordering::Greater => { + // remote leo should never be greater than leader's leo + warn!( + leader_leo, + new_remote_leo, + "remote has more records, this should not happen, this is error" + ); + return Err(anyhow!("remote's leo: {new_remote_leo} > leader's leo: {leader_leo} this should not happen, this is error")); + } + std::cmp::Ordering::Less => { + debug!( + new_remote_leo, + leader_leo, "remote has less records, need to refresh home" + ); + self.metrics.update_remote_leo(new_remote_leo); + Ok(true) + } + std::cmp::Ordering::Equal => { + debug!( + new_remote_leo, + "remote has same records, no need to refresh home" + ); + Ok(false) + } + } + } } diff --git a/crates/fluvio-spu/src/mirroring/home/home_api.rs b/crates/fluvio-spu/src/mirroring/home/home_api.rs index 8d6d01ee62..39bd6def8f 100644 --- a/crates/fluvio-spu/src/mirroring/home/home_api.rs +++ b/crates/fluvio-spu/src/mirroring/home/home_api.rs @@ -7,6 +7,8 @@ use fluvio_protocol::bytes::Buf; use fluvio_protocol::Decoder; use fluvio_protocol::api::{RequestMessage, ApiMessage, RequestHeader}; +use crate::mirroring::home::sync::DefaultHomePartitionSyncRequest; + use super::api_key::MirrorHomeApiEnum; use super::update_offsets::UpdateHomeOffsetRequest; @@ -14,6 +16,7 @@ use super::update_offsets::UpdateHomeOffsetRequest; #[derive(Debug)] pub enum HomeMirrorRequest { UpdateHomeOffset(RequestMessage), + SyncRecords(RequestMessage), } impl Default for HomeMirrorRequest { @@ -38,6 +41,10 @@ impl ApiMessage for HomeMirrorRequest { header, UpdateHomeOffsetRequest::decode_from(src, version)?, ))), + MirrorHomeApiEnum::SyncRecords => Ok(Self::SyncRecords(RequestMessage::new( + header, + DefaultHomePartitionSyncRequest::decode_from(src, version)?, + ))), } } } diff --git a/crates/fluvio-spu/src/mirroring/home/mod.rs b/crates/fluvio-spu/src/mirroring/home/mod.rs index 04a733e539..95716d3c5f 100644 --- a/crates/fluvio-spu/src/mirroring/home/mod.rs +++ b/crates/fluvio-spu/src/mirroring/home/mod.rs @@ -2,3 +2,4 @@ pub(crate) mod connection; pub(crate) mod api_key; pub(crate) mod home_api; pub(crate) mod update_offsets; +pub(crate) mod sync; diff --git a/crates/fluvio-spu/src/mirroring/home/sync.rs b/crates/fluvio-spu/src/mirroring/home/sync.rs new file mode 100644 index 0000000000..c60b247ea1 --- /dev/null +++ b/crates/fluvio-spu/src/mirroring/home/sync.rs @@ -0,0 +1,67 @@ +use std::fmt::Debug; +use std::io::Error as IoError; + +use bytes::BytesMut; +use tracing::trace; + +use fluvio_protocol::store::StoreValue; +use fluvio_protocol::store::FileWrite; +use fluvio_protocol::{Encoder, Decoder, Version}; +use fluvio_protocol::record::RecordSet; +use fluvio_protocol::api::Request; +use fluvio_protocol::record::RawRecords; +use fluvio_spu_schema::file::FileRecordSet; + +use crate::mirroring::remote::sync::MirrorPartitionSyncRequest; +use crate::mirroring::remote::sync::MirrorPartitionSyncResponse; +use crate::mirroring::COMMON_MIRROR_VERSION; + +use super::api_key::MirrorHomeApiEnum; + +pub type HomeFilePartitionSyncRequest = MirrorPartitionSyncRequestWrapper; +pub type DefaultHomePartitionSyncRequest = MirrorPartitionSyncRequestWrapper>; + +#[derive(Encoder, Decoder, Default, Debug)] +pub(crate) struct MirrorPartitionSyncRequestWrapper( + MirrorPartitionSyncRequest, +); + +impl From> for MirrorPartitionSyncRequestWrapper +where + R: Encoder + Decoder + Debug, +{ + fn from(request: MirrorPartitionSyncRequest) -> Self { + Self(request) + } +} + +impl Request for MirrorPartitionSyncRequestWrapper +where + R: Encoder + Decoder + Debug, +{ + const API_KEY: u16 = MirrorHomeApiEnum::SyncRecords as u16; + const DEFAULT_API_VERSION: i16 = COMMON_MIRROR_VERSION; + type Response = MirrorPartitionSyncResponse; +} + +impl MirrorPartitionSyncRequestWrapper +where + R: Encoder + Decoder + Debug, +{ + pub fn inner(self) -> MirrorPartitionSyncRequest { + self.0 + } +} + +impl FileWrite for HomeFilePartitionSyncRequest { + fn file_encode( + &self, + src: &mut BytesMut, + data: &mut Vec, + version: Version, + ) -> Result<(), IoError> { + trace!("file encoding fetch partition response"); + self.0.file_encode(src, data, version)?; + Ok(()) + } +} diff --git a/crates/fluvio-spu/src/mirroring/home/update_offsets.rs b/crates/fluvio-spu/src/mirroring/home/update_offsets.rs index fb0db99500..5a5278e297 100644 --- a/crates/fluvio-spu/src/mirroring/home/update_offsets.rs +++ b/crates/fluvio-spu/src/mirroring/home/update_offsets.rs @@ -6,7 +6,7 @@ use crate::replication::leader::ReplicaOffsetRequest; use super::api_key::MirrorHomeApiEnum; -/// Update home's offset +/// Update home's offset to remote pub(crate) type UpdateHomeOffsetRequest = ReplicaOffsetRequest; impl Request for UpdateHomeOffsetRequest { diff --git a/crates/fluvio-spu/src/mirroring/mod.rs b/crates/fluvio-spu/src/mirroring/mod.rs index 81690750bf..275efcf806 100644 --- a/crates/fluvio-spu/src/mirroring/mod.rs +++ b/crates/fluvio-spu/src/mirroring/mod.rs @@ -4,4 +4,4 @@ pub(crate) mod home; #[cfg(test)] mod test; -const COMMON_MIRROR_VERSION: i16 = 0; +const COMMON_MIRROR_VERSION: i16 = 1; diff --git a/crates/fluvio-spu/src/mirroring/remote/api_key.rs b/crates/fluvio-spu/src/mirroring/remote/api_key.rs index c1a768b46c..0b67ae78c2 100644 --- a/crates/fluvio-spu/src/mirroring/remote/api_key.rs +++ b/crates/fluvio-spu/src/mirroring/remote/api_key.rs @@ -6,4 +6,5 @@ use fluvio_protocol::{Encoder, Decoder}; pub enum MirrorRemoteApiEnum { #[default] SyncRecords = 0, + UpdateEdgeOffset = 1, } diff --git a/crates/fluvio-spu/src/mirroring/remote/controller.rs b/crates/fluvio-spu/src/mirroring/remote/controller.rs index c585a63359..23a0ccfbf4 100644 --- a/crates/fluvio-spu/src/mirroring/remote/controller.rs +++ b/crates/fluvio-spu/src/mirroring/remote/controller.rs @@ -30,17 +30,20 @@ use fluvio_types::event::offsets::OffsetChangeListener; use crate::{ control_plane::SharedMirrorStatusUpdate, core::{mirror::SharedMirrorLocalStore, GlobalContext}, - replication::leader::SharedLeaderState, + mirroring::remote::update_offsets::UpdateRemoteOffsetRequest, + replication::leader::{FollowerNotifier, ReplicaOffsetRequest, SharedLeaderState}, }; use crate::mirroring::home::{ home_api::HomeMirrorRequest, api_key::MirrorHomeApiEnum, update_offsets::UpdateHomeOffsetRequest, }; -use super::sync::FilePartitionSyncRequest; +use super::sync::{DefaultRemotePartitionSyncRequest, RemoteFilePartitionSyncRequest}; pub(crate) type SharedMirrorControllerState = Arc; +const MIRROR_RECONCILIATION_INTERVAL_SEC: u64 = 60; // 1 min + /// Metrics for mirror controller #[derive(Debug)] pub(crate) struct MirrorControllerMetrics { @@ -130,6 +133,7 @@ pub(crate) struct MirrorRemoteToHomeController { status_update: SharedMirrorStatusUpdate, max_bytes: u32, isolation: Isolation, + follower_notifier: Arc, } impl fmt::Debug for MirrorRemoteToHomeController @@ -171,6 +175,7 @@ where max_bytes, mirror_store: ctx.mirrors_localstore_owned(), status_update: ctx.mirror_status_update_owned(), + follower_notifier: ctx.follower_notifier_owned(), }; spawn(controller.dispatch_loop()); state @@ -181,17 +186,25 @@ where let mut offset_listener = self.leader.offset_listener(&self.isolation); let mut backoff = create_backoff(); - debug!("initial delay to wait for home cluster to be ready"); - sleep(Duration::from_secs(CLUSTER_LOOKUP_SEC)).await; - loop { // first find home cluster if let Some(home) = self.find_home_cluster() { self.state.metrics.increase_loop_count(); debug!(name = home.id, "found home cluster"); let home_socket = self.create_socket_to_home(&mut backoff, &home).await; + debug!("created socket to home"); - if let Err(err) = self + if self.remote_config.target { + if let Err(err) = self + .sync_mirror_as_target(&home, home_socket, &mut backoff) + .await + { + self.update_status(MirrorPairStatus::DetailFailure(err.to_string())) + .await + .unwrap(); + error!("error syncing mirror loop {}", err); + } + } else if let Err(err) = self .sync_mirror_loop(&home, &mut offset_listener, home_socket, &mut backoff) .await { @@ -199,17 +212,15 @@ where .await .unwrap(); error!("error syncing mirror loop {}", err); - self.backoff_and_wait(&mut backoff).await; } } else { - warn!("home cluster not found, waiting..."); + warn!("home cluster not found"); sleep(Duration::from_secs(CLUSTER_LOOKUP_SEC)).await; } } } - #[instrument] - // main sync loop for each home connection + #[instrument(skip(home, leader_offset_listner, home_socket, tls, backoff))] async fn sync_mirror_loop( &self, home: &Home, @@ -217,7 +228,7 @@ where (home_socket, tls): (FluvioSocket, bool), backoff: &mut ExponentialBackoff, ) -> Result<()> { - debug!(home_id = home.id, "start syncing mirror"); + debug!(home_id = home.id, "start syncing mirror as source"); let (mut home_sink, mut home_stream) = home_socket.split(); @@ -242,7 +253,8 @@ where // update home if flag is set and we know what home leo is if home_updated_needed && home_leo >= 0 { - self.update_home(&mut home_sink, home_leo).await?; + self.update_remote_as_source(&mut home_sink, home_leo) + .await?; self.update_status(MirrorPairStatus::Succesful).await?; home_updated_needed = false; } @@ -260,11 +272,92 @@ where match home_msg { HomeMirrorRequest::UpdateHomeOffset(req)=> { - home_updated_needed = self.update_from_home(req)?; + home_updated_needed = self.update_from_home_as_source(req)?; + }, + HomeMirrorRequest::SyncRecords(sync_request)=> { + return Err(anyhow!("received sync record request from home, this should not happen, since we are source")); } } - backoff.reset(); self.update_status(MirrorPairStatus::Succesful).await?; + backoff.reset(); + } else { + warn!("spu socket to home has terminated"); + self.update_status(MirrorPairStatus::DetailFailure("closed connection".to_owned())) + .await?; + self.backoff_and_wait(backoff).await; + break; + } + + } + } + + self.state.metrics.increase_conn_count(); + } + + info!("home has closed connection, terminating loop"); + + Ok(()) + } + + #[instrument(skip(home, home_socket, tls, backoff))] + // sync loop when this is source + async fn sync_mirror_as_target( + &self, + home: &Home, + (home_socket, tls): (FluvioSocket, bool), + backoff: &mut ExponentialBackoff, + ) -> Result<()> { + info!(home_id = home.id, "start syncing mirror as target"); + + let (mut home_sink, mut home_stream) = home_socket.split(); + + if tls { + debug!("tls enabled, disabling zero copy sink"); + home_sink.disable_zerocopy(); + } + + let mut home_api_stream = home_stream.api_stream::(); + + self.send_initial_request(home, &mut home_sink).await?; + + let mut paired: bool = false; // pairing status + + self.send_offsets_to_home_as_target(&mut home_sink).await?; + + // timer to update offsets to home + let mut reconc_timer = sleep(Duration::from_secs(MIRROR_RECONCILIATION_INTERVAL_SEC)); + + // home_updated_needed triggers warning, despite being used in loop + #[allow(unused)] + loop { + select! { + _ = &mut reconc_timer => { + info!("timer expired, sending reconciliation"); + self.send_offsets_to_home_as_target(&mut home_sink).await?; + reconc_timer = sleep(Duration::from_secs(MIRROR_RECONCILIATION_INTERVAL_SEC)); + }, + + msg = home_api_stream.next() => { + debug!("received response from home"); + if let Some(req_msg_home) = msg { + let home_msg = req_msg_home?; + + match home_msg { + HomeMirrorRequest::UpdateHomeOffset(req)=> { + return Err(anyhow!("received home offset request from home, this should not happen, since we are target")); + }, + HomeMirrorRequest::SyncRecords(sync_request)=> { + if(!paired) { + info!("sync received for the first time, indicating paired"); + self.update_status(MirrorPairStatus::Succesful).await?; + paired = true; + } + + self.sync_record_from_home(sync_request.request.inner()).await?; + self.send_offsets_to_home_as_target(&mut home_sink).await?; + } + } + backoff.reset(); } else { warn!("spu socket to home has terminated"); self.update_status(MirrorPairStatus::DetailFailure("closed connection".to_owned())) @@ -279,7 +372,7 @@ where self.state.metrics.increase_conn_count(); } - debug!("home has closed connection, terminating loop"); + info!("home has closed connection, terminating loop"); Ok(()) } @@ -292,12 +385,14 @@ where async fn send_initial_request(&self, home: &Home, home_sink: &mut FluvioSink) -> Result<()> { // always starts with mirrong request + // this is equivalent to register request + // home should perform additional validation to ensure invalid edge request are rejected let start_mirror_request = RequestMessage::new_request(StartMirrorRequest { remote_cluster_id: home.remote_id.clone(), remote_replica: self.leader.id().to_string(), }); - debug!("sending start mirror request: {:#?}", start_mirror_request); + info!(remote_id = home.remote_id, cluster = %self.leader.id(),"sending start mirror request"); // send start mirror request home_sink @@ -306,10 +401,12 @@ where .map_err(|err| err.into()) } - /// received new offset from home, update controller's knowledge - /// it will return true if home needs to be updated + /// received new offset from home #[instrument(skip(req))] - fn update_from_home(&self, req: RequestMessage) -> Result { + fn update_from_home_as_source( + &self, + req: RequestMessage, + ) -> Result { let leader_leo = self.leader.leo(); let old_home_leo = self.state.metrics.get_home_leo(); let new_home_leo = req.request.leo; @@ -324,12 +421,12 @@ where } match new_home_leo.cmp(&leader_leo) { std::cmp::Ordering::Greater => { - // home leo should never be greater than leader's leo + // home leo should never be greater than leader's leo if this is not mirror target warn!( leader_leo, new_home_leo, "home has more records, this should not happen, this is error" ); - return Err(anyhow!("home's leo: {new_home_leo} > leader's leo: {leader_leo} this should not happen, this is error")); + return Err(anyhow!("home's leo: {new_home_leo} > leader's leo: {leader_leo} this should not happen since this is target, this is error")); } std::cmp::Ordering::Less => { debug!( @@ -349,21 +446,6 @@ where } } - #[instrument] - async fn update_home(&self, sink: &mut FluvioSink, home_leo: Offset) -> Result<()> { - debug!("updating home cluster"); - if let Some(sync_request) = self.generate_home_sync(home_leo).await? { - debug!(?sync_request, "home sync"); - let request = RequestMessage::new_request(sync_request) - .set_client_id(format!("leader: {}", self.leader.id())); - sink.encode_file_slices(&request, request.header.api_version()) - .await?; - Ok(()) - } else { - Ok(()) - } - } - /// look up home cluster from local store /// this may return None if remote cluster is send by SC by time controller is started fn find_home_cluster(&self) -> Option { @@ -381,11 +463,26 @@ where } } - /// compute records necessary to fill in gap for mirror home - async fn generate_home_sync( + #[instrument] + async fn update_remote_as_source(&self, sink: &mut FluvioSink, home_leo: Offset) -> Result<()> { + debug!("updating home cluster"); + if let Some(sync_request) = self.geneate_remote_record_as_source(home_leo).await? { + debug!(?sync_request, "home sync"); + let request = RequestMessage::new_request(sync_request) + .set_client_id(format!("leader: {}", self.leader.id())); + sink.encode_file_slices(&request, request.header.api_version()) + .await?; + Ok(()) + } else { + Ok(()) + } + } + + /// remote is source, generate missing records to send to home + async fn geneate_remote_record_as_source( &self, home_leo: Offset, - ) -> Result> { + ) -> Result> { // leader off should be always greater than remote leo let leader_offset = self.leader.as_offset(); @@ -395,7 +492,7 @@ where return Ok(None); } - let mut partition_response = FilePartitionSyncRequest { + let mut partition_response = RemoteFilePartitionSyncRequest { leo: leader_offset.leo, hw: leader_offset.hw, ..Default::default() @@ -407,6 +504,7 @@ where .read_records(home_leo, self.max_bytes, self.isolation) .await { + // leader offset is greater than home, we need to send records to home (default) Ok(slice) => { debug!( hw = slice.end.hw, @@ -425,7 +523,7 @@ where } } } else { - // + // home has more records, then we sync copy records from home debug!( hw = leader_offset.hw, leo = leader_offset.leo, @@ -438,6 +536,19 @@ where } } + #[instrument(skip(self, req))] + async fn sync_record_from_home( + &self, + mut req: DefaultRemotePartitionSyncRequest, + ) -> Result<()> { + let append_flag = self + .leader + .append_record_set(&mut req.records, &self.follower_notifier) + .await?; + debug!(append_flag, "leader appended"); + Ok(()) + } + /// create socket to home, this will always succeed #[instrument(skip(self, home))] async fn create_socket_to_home( @@ -505,6 +616,23 @@ where debug!("resume from backing off"); self.state.metrics.increase_conn_failure(); } + + // as target, send offset to home so it can sync records + async fn send_offsets_to_home_as_target(&self, sink: &mut FluvioSink) -> Result<()> { + let offset_request = ReplicaOffsetRequest { + replica: self.leader.id().clone(), + leo: self.leader.leo(), + hw: self.leader.hw(), + }; + + debug!(?offset_request, "sending offset to home"); + let req_msg: RequestMessage = + RequestMessage::new_request(offset_request.into()).set_client_id("mirror home"); + + sink.send_request(&req_msg).await?; + + Ok(()) + } } fn create_backoff() -> ExponentialBackoff { diff --git a/crates/fluvio-spu/src/mirroring/remote/mod.rs b/crates/fluvio-spu/src/mirroring/remote/mod.rs index 3517ac0a72..0b661ca376 100644 --- a/crates/fluvio-spu/src/mirroring/remote/mod.rs +++ b/crates/fluvio-spu/src/mirroring/remote/mod.rs @@ -2,3 +2,4 @@ pub(crate) mod controller; pub(crate) mod api_key; pub(crate) mod remote_api; pub(crate) mod sync; +pub(crate) mod update_offsets; diff --git a/crates/fluvio-spu/src/mirroring/remote/remote_api.rs b/crates/fluvio-spu/src/mirroring/remote/remote_api.rs index 7516fa80fc..6e928a38da 100644 --- a/crates/fluvio-spu/src/mirroring/remote/remote_api.rs +++ b/crates/fluvio-spu/src/mirroring/remote/remote_api.rs @@ -8,16 +8,19 @@ use fluvio_protocol::Decoder; use fluvio_protocol::api::{RequestMessage, ApiMessage, RequestHeader}; use super::api_key::MirrorRemoteApiEnum; -use super::sync::DefaultPartitionSyncRequest; +use super::sync::DefaultRemotePartitionSyncRequest; +use super::update_offsets::UpdateRemoteOffsetRequest; +/// Requests from remote to home #[derive(Debug)] pub enum RemoteMirrorRequest { - SyncRecords(RequestMessage), + SyncRecords(RequestMessage), + UpdateRemoteOffset(RequestMessage), } impl Default for RemoteMirrorRequest { fn default() -> Self { - Self::SyncRecords(RequestMessage::::default()) + Self::SyncRecords(RequestMessage::::default()) } } @@ -33,9 +36,15 @@ impl ApiMessage for RemoteMirrorRequest { trace!("decoding with header: {:#?}", header); let version = header.api_version(); match header.api_key().try_into()? { + MirrorRemoteApiEnum::UpdateEdgeOffset => { + Ok(Self::UpdateRemoteOffset(RequestMessage::new( + header, + UpdateRemoteOffsetRequest::decode_from(src, version)?, + ))) + } MirrorRemoteApiEnum::SyncRecords => Ok(Self::SyncRecords(RequestMessage::new( header, - DefaultPartitionSyncRequest::decode_from(src, version)?, + DefaultRemotePartitionSyncRequest::decode_from(src, version)?, ))), } } diff --git a/crates/fluvio-spu/src/mirroring/remote/sync.rs b/crates/fluvio-spu/src/mirroring/remote/sync.rs index 15dd4406de..ee4eae287e 100644 --- a/crates/fluvio-spu/src/mirroring/remote/sync.rs +++ b/crates/fluvio-spu/src/mirroring/remote/sync.rs @@ -18,8 +18,8 @@ use crate::mirroring::COMMON_MIRROR_VERSION; use super::api_key::MirrorRemoteApiEnum; -pub type FilePartitionSyncRequest = MirrorPartitionSyncRequest; -pub type DefaultPartitionSyncRequest = MirrorPartitionSyncRequest>; +pub type RemoteFilePartitionSyncRequest = MirrorPartitionSyncRequest; +pub type DefaultRemotePartitionSyncRequest = MirrorPartitionSyncRequest>; #[derive(Encoder, Decoder, Default, Debug)] pub struct MirrorPartitionSyncRequest @@ -55,7 +55,7 @@ where #[derive(Default, Encoder, Decoder, Debug)] pub struct MirrorPartitionSyncResponse {} -impl FileWrite for FilePartitionSyncRequest { +impl FileWrite for RemoteFilePartitionSyncRequest { fn file_encode( &self, src: &mut BytesMut, diff --git a/crates/fluvio-spu/src/mirroring/remote/update_offsets.rs b/crates/fluvio-spu/src/mirroring/remote/update_offsets.rs new file mode 100644 index 0000000000..8afc908535 --- /dev/null +++ b/crates/fluvio-spu/src/mirroring/remote/update_offsets.rs @@ -0,0 +1,33 @@ +use fluvio_protocol::{Encoder, Decoder}; +use fluvio_protocol::api::Request; + +use crate::mirroring::COMMON_MIRROR_VERSION; +use crate::replication::leader::ReplicaOffsetRequest; + +use super::api_key::MirrorRemoteApiEnum; + +/// Edge offset +#[derive(Decoder, Encoder, Default, Clone, Debug)] +pub(crate) struct UpdateRemoteOffsetRequest(ReplicaOffsetRequest); + +impl From for UpdateRemoteOffsetRequest { + fn from(offset: ReplicaOffsetRequest) -> Self { + Self(offset) + } +} + +impl UpdateRemoteOffsetRequest { + pub fn offset(&self) -> &ReplicaOffsetRequest { + &self.0 + } +} + +impl Request for UpdateRemoteOffsetRequest { + const API_KEY: u16 = MirrorRemoteApiEnum::UpdateEdgeOffset as u16; + const DEFAULT_API_VERSION: i16 = COMMON_MIRROR_VERSION; + type Response = UpdateEdgeOffsetResponse; +} + +// no content, this is one way request +#[derive(Decoder, Encoder, Default, Debug)] +pub struct UpdateEdgeOffsetResponse {} diff --git a/crates/fluvio-spu/src/mirroring/test/fixture.rs b/crates/fluvio-spu/src/mirroring/test/fixture.rs index 8c13cc142a..608fca1c87 100644 --- a/crates/fluvio-spu/src/mirroring/test/fixture.rs +++ b/crates/fluvio-spu/src/mirroring/test/fixture.rs @@ -20,16 +20,16 @@ use crate::config::SpuConfig; use crate::core::{DefaultSharedGlobalContext, GlobalContext}; use crate::replication::leader::LeaderReplicaState; -pub(crate) fn default_topic() -> String { - "temp".to_owned() +pub(crate) fn default_topic() -> &'static str { + "topic1" } -pub(crate) fn default_host() -> String { - "127.0.0.1".to_owned() +pub(crate) fn default_host() -> &'static str { + "127.0.0.1" } -pub(crate) fn default_base_spu_key() -> String { - "temp-0".to_owned() +pub(crate) fn default_replica() -> &'static str { + "topic1-0" } // find unused port in local host @@ -38,15 +38,19 @@ pub(crate) fn local_port() -> String { format!("127.0.0.1:{port}") } -fn default_home_port() -> String { - "localhost:30000".to_owned() +fn default_home_port() -> &'static str { + "localhost:30000" } -fn default_home_cluster() -> String { - "my-home".to_owned() +pub(crate) fn default_home_cluster() -> &'static str { + "my-home" } -pub(crate) fn default_remote_topic() -> String { +pub(crate) fn default_remote_cluster() -> &'static str { + "remote1" +} + +pub(crate) fn default_remote_topic() -> &'static str { default_topic() } @@ -58,23 +62,27 @@ pub(crate) struct ReplicaConfig { base_port: u16, #[builder(default = "0")] followers: u16, - #[builder(default = "default_topic()")] + #[builder(default = "default_topic().to_owned()", setter(into))] topic: String, #[builder(default = "5001")] base_spu_id: SpuId, - #[builder(default = "default_base_spu_key()")] + #[builder(default = "default_replica().to_owned()", setter(into))] base_spu_key: String, - #[builder(default = "default_host()")] + #[builder(default = "default_host().to_owned()")] host: String, - #[builder(default = "default_home_port()")] + #[builder(default = "default_home_port().to_owned()")] home_port: String, - #[builder(default = "default_home_cluster()")] + #[builder(default = "default_home_cluster().to_owned()", setter(into))] home_cluster: String, + #[builder(default = "default_remote_cluster().to_owned()", setter(into))] + remote_cluster: String, /// if set then this is mirror home and we create multiple home partitions #[builder(default)] remote_clusters: Vec, - #[builder(default = "default_remote_topic()")] + #[builder(default = "default_remote_topic().to_owned()", setter(into))] remote_topic: String, + #[builder(default)] + home_to_remote: bool, } impl ReplicaConfig { @@ -135,6 +143,7 @@ impl ReplicaConfig { home_spu_key: self.base_spu_key.clone(), home_spu_id: self.base_spu_id, home_spu_endpoint: self.home_port.clone(), + target: self.home_to_remote, })); replica } @@ -145,6 +154,7 @@ impl ReplicaConfig { replica.mirror = Some(PartitionMirrorConfig::Home(HomePartitionConfig { remote_cluster: remote_cluster_name.to_string(), remote_replica: ReplicaKey::new(self.remote_topic.clone(), 0u32).to_string(), + source: self.home_to_remote, })); replica @@ -193,7 +203,7 @@ impl ReplicaConfig { /// creates mirror remote ctx and returns remote replica pub(crate) async fn init_mirror_remote( - &self, + self, ) -> (DefaultSharedGlobalContext, LeaderReplicaState) { let replica = self.remote_replica(); @@ -204,9 +214,9 @@ impl ReplicaConfig { name: self.home_cluster.to_owned(), spec: MirrorSpec { mirror_type: MirrorType::Home(Home { - id: self.home_cluster.clone(), - remote_id: self.home_cluster.clone(), - public_endpoint: self.home_port.clone(), + id: self.home_cluster, + remote_id: self.remote_cluster, + public_endpoint: self.home_port, client_tls: None, }), }, @@ -320,7 +330,7 @@ async fn replica_leader_write_test() { let status = leader_gctx.status_update().remove_all().await; assert!(!status.is_empty()); let lrs = &status[0]; - assert_eq!(lrs.id, ("temp", 0).into()); + assert_eq!(lrs.id, ("topic1", 0).into()); assert_eq!(lrs.leader.spu, 5001); assert_eq!(lrs.leader.hw, 0); assert_eq!(lrs.leader.leo, 0); @@ -340,7 +350,7 @@ async fn replica_leader_write_test() { let status = leader_gctx.status_update().remove_all().await; assert!(!status.is_empty()); let lrs = &status[0]; - assert_eq!(lrs.id, ("temp", 0).into()); + assert_eq!(lrs.id, ("topic1", 0).into()); assert_eq!(lrs.leader.spu, 5001); assert_eq!(lrs.leader.hw, 2); assert_eq!(lrs.leader.leo, 2); diff --git a/crates/fluvio-spu/src/mirroring/test/integration.rs b/crates/fluvio-spu/src/mirroring/test/integration.rs index 0bf1d2de45..4f245e54dd 100644 --- a/crates/fluvio-spu/src/mirroring/test/integration.rs +++ b/crates/fluvio-spu/src/mirroring/test/integration.rs @@ -7,23 +7,29 @@ use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartiti use fluvio_future::timer::sleep; use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey}; -use crate::services::{auth::SpuAuthGlobalContext, public::create_public_server}; +use crate::{ + mirroring::test::fixture::{default_home_cluster, default_replica, default_topic}, + services::{auth::SpuAuthGlobalContext, public::create_public_server}, +}; use super::fixture::{ReplicaConfig, local_port}; +const REMOTE1: &str = "remote1"; +const REMOTE2: &str = "remote2"; + /// Test mirroring when we write new records when all clusters are up #[fluvio_future::test(ignore)] -async fn test_mirroring_new_records() { +async fn test_mirroring_from_edge_to_home() { // find free port for home let home_port = local_port(); let home_builder = ReplicaConfig::builder() - .remote_clusters(vec!["edge1".to_owned(), "edge2".to_owned()]) + .remote_clusters(vec![REMOTE1.to_owned(), REMOTE2.to_owned()]) .generate("mirror_home"); let home_gctx = home_builder.init_mirror_home().await; let home_replica0 = home_gctx .leaders_state() - .get(&ReplicaKey::new("temp", 0u32)) + .get(&ReplicaKey::new(default_topic(), 0u32)) .await .expect("leader"); assert_eq!( @@ -35,31 +41,33 @@ async fn test_mirroring_new_records() { .home() .expect("home"), &HomePartitionConfig { - remote_cluster: "edge1".to_owned(), - remote_replica: "temp-0".to_owned(), + remote_cluster: REMOTE1.to_owned(), + remote_replica: default_replica().to_owned(), + source: false } ); // check if remote cluster is set let remote_cluster = home_gctx .mirrors_localstore() - .spec(&"edge1".to_string()) + .spec(&REMOTE1.to_owned()) .expect("remote cluster"); - assert_eq!(remote_cluster.name, "edge1"); + assert_eq!(remote_cluster.name, REMOTE1); debug!(remote_clusters = ?home_gctx.mirrors_localstore(), "home clusters remotes"); debug!(replicas = ?home_gctx.leaders_state().replica_configs().await, "home leaders"); - let mirror_home_replica = home_gctx + let (mirror_home_replica, source) = home_gctx .leaders_state() - .find_mirror_home_leader("edge1", "temp-0") + .find_mirror_home_leader(REMOTE1, default_replica()) .await .expect("mirror home"); - assert_eq!(mirror_home_replica.id(), &("temp", 0).into()); + assert!(!source); + assert_eq!(mirror_home_replica.id(), &(default_topic(), 0).into()); assert_eq!(home_replica0.leo(), 0); // check 2nd home replica let home_replica1 = home_gctx .leaders_state() - .get(&ReplicaKey::new("temp", 1u32)) + .get(&ReplicaKey::new(default_topic(), 1u32)) .await .expect("2nd targert"); @@ -78,7 +86,7 @@ async fn test_mirroring_new_records() { // start 1st remote let sourcd_builder_1 = ReplicaConfig::builder() .home_port(home_port.clone()) - .home_cluster("edge1".to_owned()) + .remote_cluster(REMOTE1) .generate("mirror_remote"); let (remote_ctx1, remote_replica_1) = sourcd_builder_1.init_mirror_remote().await; @@ -90,10 +98,11 @@ async fn test_mirroring_new_records() { assert_eq!( remote_mirror1.remote().expect("remote"), &RemotePartitionConfig { - home_spu_key: "temp-0".to_owned(), - home_cluster: "edge1".to_owned(), + home_spu_key: default_replica().to_owned(), + home_cluster: default_home_cluster().to_owned(), home_spu_id: 5001, home_spu_endpoint: home_port.clone(), + target: false } ); @@ -123,7 +132,7 @@ async fn test_mirroring_new_records() { // start 2nd remote let sourcd_builder2 = ReplicaConfig::builder() .home_port(home_port.clone()) - .home_cluster("edge2".to_owned()) + .remote_cluster(REMOTE2) .generate("mirror_remote"); let (_remote_ctx2, remote_replica2) = sourcd_builder2.init_mirror_remote().await; @@ -135,10 +144,11 @@ async fn test_mirroring_new_records() { assert_eq!( remote_mirror2.remote().expect("remote"), &RemotePartitionConfig { - home_spu_key: "temp-0".to_owned(), - home_cluster: "edge2".to_owned(), + home_spu_key: default_replica().to_owned(), + home_cluster: default_home_cluster().to_owned(), home_spu_id: 5001, home_spu_endpoint: home_port.clone(), + ..Default::default() } ); @@ -163,3 +173,160 @@ async fn test_mirroring_new_records() { // home should have recods assert_eq!(home_replica1.leo(), 2); } + +/// Test mirroring from home to edge +#[fluvio_future::test(ignore)] +async fn test_mirror_home_to_edge() { + // find free port for home + let home_port = local_port(); + + let home_builder = ReplicaConfig::builder() + .remote_clusters(vec![REMOTE1.to_owned(), REMOTE2.to_owned()]) + .home_to_remote(true) + .generate("mirror_home"); + let home_gctx = home_builder.init_mirror_home().await; + let home_replica0 = home_gctx + .leaders_state() + .get(&ReplicaKey::new(default_topic(), 0u32)) + .await + .expect("leader"); + assert_eq!( + home_replica0 + .get_replica() + .mirror + .as_ref() + .expect("mirror") + .home() + .expect("home"), + &HomePartitionConfig { + remote_cluster: REMOTE1.to_owned(), + remote_replica: default_replica().to_owned(), + source: true + } + ); + // check if remote cluster is set + let remote_cluster = home_gctx + .mirrors_localstore() + .spec(&REMOTE1.to_owned()) + .expect("remote cluster"); + assert_eq!(remote_cluster.name, REMOTE1); + + debug!(remote_clusters = ?home_gctx.mirrors_localstore(), "home clusters remotes"); + debug!(replicas = ?home_gctx.leaders_state().replica_configs().await, "home leaders"); + let (mirror_home_replica, source) = home_gctx + .leaders_state() + .find_mirror_home_leader(REMOTE1, default_replica()) + .await + .expect("mirror home"); + assert!(source); + assert_eq!(mirror_home_replica.id(), &(default_topic(), 0).into()); + assert_eq!(home_replica0.leo(), 0); + + // check 2nd home replica + + let home_replica1 = home_gctx + .leaders_state() + .get(&ReplicaKey::new(default_topic(), 1u32)) + .await + .expect("2nd targert"); + + // start home server + debug!("starting home server"); + + let auth_global_ctx = + SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(RootAuthorization::new())); + let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run(); + + // sleep 1 seconds + debug!("waiting for home public server to up"); + sleep(Duration::from_secs(1)).await; + debug!("done waiting"); + + // start 1st remote + let sourcd_builder_1 = ReplicaConfig::builder() + .home_port(home_port.clone()) + .remote_cluster(REMOTE1) + .home_to_remote(true) + .generate("mirror_remote"); + + let (_remote_ctx1, remote_replica_1) = sourcd_builder_1.init_mirror_remote().await; + let remote_mirror1 = remote_replica_1 + .get_replica() + .mirror + .as_ref() + .expect("mirror"); + assert_eq!( + remote_mirror1.remote().expect("remote"), + &RemotePartitionConfig { + home_spu_key: default_replica().to_owned(), + home_cluster: default_home_cluster().to_owned(), + home_spu_id: 5001, + home_spu_endpoint: home_port.clone(), + target: true + } + ); + + // sleep 1 seconds + debug!("waiting for mirror remote controller to startup"); + sleep(Duration::from_secs(1)).await; + debug!("done waiting"); + + // write records from home + home_replica0 + .write_record_set(&mut create_raw_recordset(2), home_gctx.follower_notifier()) + .await + .expect("write"); + + assert_eq!(home_replica0.leo(), 2); + + // wait to replicate + debug!("waiting for mirroring"); + sleep(Duration::from_secs(2)).await; + debug!("done waiting"); + + // home should have recods + assert_eq!(remote_replica_1.leo(), 2); + + // start 2nd remote + let sourcd_builder2 = ReplicaConfig::builder() + .home_port(home_port.clone()) + .remote_cluster(REMOTE2) + .home_to_remote(true) + .generate("mirror_remote"); + + let (_remote_ctx2, remote_replica2) = sourcd_builder2.init_mirror_remote().await; + let remote_mirror2 = remote_replica2 + .get_replica() + .mirror + .as_ref() + .expect("mirror"); + assert_eq!( + remote_mirror2.remote().expect("remote"), + &RemotePartitionConfig { + home_spu_key: default_replica().to_owned(), + home_cluster: default_home_cluster().to_owned(), + home_spu_id: 5001, + home_spu_endpoint: home_port.clone(), + target: true + } + ); + + // sleep 1 seconds + debug!("waiting for mirror remote controller 2nd to start up"); + sleep(Duration::from_secs(1)).await; + debug!("done waiting"); + + // write records from home + home_replica1 + .write_record_set(&mut create_raw_recordset(2), home_gctx.follower_notifier()) + .await + .expect("write"); + + assert_eq!(home_replica1.leo(), 2); + + debug!("waiting for mirroring"); + sleep(Duration::from_secs(5)).await; + debug!("done waiting"); + // home should have recods + assert_eq!(home_replica1.leo(), 2); +} diff --git a/crates/fluvio-spu/src/replication/leader/leaders_state.rs b/crates/fluvio-spu/src/replication/leader/leaders_state.rs index 71f6638311..fab25b1a1c 100644 --- a/crates/fluvio-spu/src/replication/leader/leaders_state.rs +++ b/crates/fluvio-spu/src/replication/leader/leaders_state.rs @@ -82,17 +82,18 @@ where } /// find replica with mirror target that matches remote cluster and sourcre replica - pub async fn find_mirror_home_leader( + /// also return if it is source or target + pub(crate) async fn find_mirror_home_leader( &self, remote_cluster: &str, home_replica: &str, - ) -> Option> { + ) -> Option<(SharedLeaderState, bool)> { let read = self.read().await; for (_replica_key, state) in read.iter() { let replica_config = state.get_replica(); if let Some(PartitionMirrorConfig::Home(home)) = &replica_config.mirror { if home.remote_cluster == remote_cluster && home.remote_replica == home_replica { - return Some(state.clone()); + return Some((state.clone(), home.source)); } } } diff --git a/crates/fluvio-spu/src/services/public/produce_handler.rs b/crates/fluvio-spu/src/services/public/produce_handler.rs index d723d1fbc2..4452c2f686 100644 --- a/crates/fluvio-spu/src/services/public/produce_handler.rs +++ b/crates/fluvio-spu/src/services/public/produce_handler.rs @@ -115,12 +115,11 @@ async fn handle_produce_topic( }; if let Some(mirror) = &leader_state.get_replica().mirror { - if mirror.is_home_mirror() { + if let Some(err) = mirror.accept_traffic() { debug!(%replica_id, "Mirror replica is not supported for produce"); - topic_result.partitions.push(PartitionWriteResult::error( - replica_id, - ErrorCode::MirrorProduceFromHome, - )); + topic_result + .partitions + .push(PartitionWriteResult::error(replica_id, err)); continue; } } diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 1af42b9ec1..fbc3bed577 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.23.4" +version = "0.24.0" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/k8-util/helm/fluvio-sys/Chart.yaml b/k8-util/helm/fluvio-sys/Chart.yaml index f2fded0aa8..063e0310c6 100644 --- a/k8-util/helm/fluvio-sys/Chart.yaml +++ b/k8-util/helm/fluvio-sys/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v2 name: fluvio-sys description: A Helm chart for Fluvio type: application -version: 0.9.19 +version: 0.9.20 diff --git a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml index 3b836c97ac..48d4687d85 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml @@ -42,6 +42,8 @@ spec: type: string remoteCluster: type: string + source: + type: boolean remote: type: object required: ["homeCluster","homeSpuKey","homeSpuEndpoint","homeSpu"] @@ -55,6 +57,8 @@ spec: homeSpuId: type: integer minimum: 0 + target: + type: boolean cleanupPolicy: type: object properties: diff --git a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml index c13849c4c7..ccd2326c4f 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml @@ -67,15 +67,23 @@ spec: - required: ["remote"] properties: home: - type: array - items: - type: object - required: ["remoteCluster","remoteReplica"] - properties: - remoteCluster: - type: string - remoteReplica: - type: string + x-kubernetes-preserve-unknown-fields: true + type: object + properties: + partitions: + type: array + items: + type: object + required: ["remoteCluster", "remoteReplica"] + properties: + remoteCluster: + type: string + remoteReplica: + type: string + source: + type: boolean + source: + type: boolean remote: type: object required: ["homeCluster","homeSpus"] @@ -95,6 +103,9 @@ spec: type: string endpoint: type: string + target: + type: boolean + cleanupPolicy: type: object properties: @@ -186,4 +197,3 @@ spec: # field of custom resources to the proper value strategy: None - diff --git a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats index 3ec925ecf8..a2dfcbd6c0 100644 --- a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats +++ b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats @@ -16,7 +16,7 @@ setup_file() { export REMOTE_PROFILE debug_msg "Remote profile: $REMOTE_PROFILE" - REMOTE_NAME=edge1 + REMOTE_NAME=remote1 export REMOTE_NAME debug_msg "Remote name: $REMOTE_NAME" @@ -24,7 +24,7 @@ setup_file() { export REMOTE_PROFILE_2 debug_msg "Remote profile: $REMOTE_PROFILE_2" - REMOTE_NAME_2=edge2 + REMOTE_NAME_2=remote2 export REMOTE_NAME_2 debug_msg "Remote name: $REMOTE_NAME_2" @@ -36,6 +36,10 @@ setup_file() { export TOPIC_NAME debug_msg "Topic name: $TOPIC_NAME" + REVERSE_TOPIC_NAME=mirror-topic-reverse + export REVERSE_TOPIC_NAME + debug_msg "Topic name: $REVERSE_TOPIC_NAME" + HOME_PROFILE=$($FLUVIO_BIN profile) export HOME_PROFILE debug_msg "Home profile: $HOME_PROFILE" @@ -107,12 +111,32 @@ setup_file() { assert_success } +@test "Can create a mirror topic with the remote clusters reverse" { + echo "[\"$REMOTE_NAME\",\"$REMOTE_NAME_2\"]" > remotes_devices.json + run timeout 15s "$FLUVIO_BIN" topic create "$REVERSE_TOPIC_NAME" --mirror-apply remotes_devices.json --home-to-remote + + assert_output "topic \"$REVERSE_TOPIC_NAME\" created" + assert_success +} + +@test "Can produce message to reverse mirror topic from home" { + run bash -c 'echo 3 | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + assert_success + run bash -c 'echo c | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + assert_success + run bash -c 'echo 4 | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + assert_success + run bash -c 'echo d | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + assert_success +} + @test "Can switch to remote cluster 1" { run timeout 15s "$FLUVIO_BIN" profile switch "$REMOTE_PROFILE" assert_output "" assert_success } + #TODO: actually we should have the topics created by the export file too. @test "Can't produce message to mirror topic from remote 1 before connect to home" { sleep 5 @@ -150,6 +174,40 @@ setup_file() { assert_success } +@test "Can't produce message to reverse mirror topic from remote 1" { + run bash -c 'echo 1 | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + + assert_output "Producer error: Producer received an error code: produce from remote target is not allowed" + assert_failure +} + +@test "Can consume message from reverse mirror topic from remote 1" { + sleep 5 + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d + assert_output 3$'\n'c$'\n'4$'\n'd + assert_success +} + +@test "Can switch back to home cluster" { + run timeout 15s "$FLUVIO_BIN" profile switch "$HOME_PROFILE" + assert_output "" + assert_success +} + + +#TODO: we don't have a way to produce directly for a partition, +# when we have it, we should create a test to produce to a partition 1 than should be consumed by remote 2 +# @test "Can produce message to reverse mirror topic from home again" { +# run bash -c 'echo 5 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' +# assert_success +# run bash -c 'echo e | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' +# assert_success +# run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' +# assert_success +# run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' +# assert_success +# } + @test "Can switch to remote cluster 2" { run timeout 15s "$FLUVIO_BIN" profile switch "$REMOTE_PROFILE_2" assert_output "" @@ -185,6 +243,22 @@ setup_file() { assert_success } +@test "Can't produce message to reverse mirror topic from remote 2" { + run bash -c 'echo 9 | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + + assert_output "Producer error: Producer received an error code: produce from remote target is not allowed" + assert_failure +} + +#TODO: we don't have a way to produce directly for a partition, +# when we have it, we should create a test to produce to a partition 1 than should be consumed by remote 2 +# @test "Can consume message from reverse mirror topic from remote 2" { +# sleep 5 +# run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d +# assert_output 5$'\n'e$'\n'6$'\n'f +# assert_success +# } + @test "Can't delete mirror topic from remote 2" { run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" @@ -192,7 +266,7 @@ setup_file() { assert_failure } -@test "Can switch back to home cluster" { +@test "Can switch back to home cluster again" { run timeout 15s "$FLUVIO_BIN" profile switch "$HOME_PROFILE" assert_output "" assert_success @@ -208,6 +282,13 @@ setup_file() { assert [ ${#lines[@]} -eq 3 ] } +@test "Can consume message from reverse mirror topic from home" { + sleep 5 + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d + assert_output 3$'\n'c$'\n'4$'\n'd + assert_success +} + @test "Can consume message from mirror topic produced from remote 1 by partition or remote" { sleep 5 run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -p 0 -B -d @@ -234,9 +315,13 @@ setup_file() { @test "Can delete mirror topic" { run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" - assert_output "topic \"$TOPIC_NAME\" deleted" assert_success + + + run timeout 15s "$FLUVIO_BIN" topic delete "$REVERSE_TOPIC_NAME" + assert_output "topic \"$REVERSE_TOPIC_NAME\" deleted" + assert_success } @test "Can switch to remote cluster 1 and check if the mirror topic is deleted" { diff --git a/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats b/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats new file mode 100644 index 0000000000..741f208b8a --- /dev/null +++ b/tests/cli/mirroring_smoke_tests/mirror-topic-reverse.bats @@ -0,0 +1,91 @@ +#!/usr/bin/env bats + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + +setup_file() { + CURRENT_DATE=$(date +%Y-%m) + export CURRENT_DATE + + REMOTE_NAME="$(random_string 7)" + export REMOTE_NAME + debug_msg "Remote name: $REMOTE_NAME" + + REMOTE_NAME_2="$(random_string 7)" + export REMOTE_NAME_2 + debug_msg "Remote name 2: $REMOTE_NAME_2" + + MESSAGE="$(random_string 7)" + export MESSAGE + debug_msg "$MESSAGE" + + TOPIC_NAME="$(random_string 7)" + export TOPIC_NAME + debug_msg "Topic name: $TOPIC_NAME" +} + +@test "Can register an remote clusters" { + run timeout 15s "$FLUVIO_BIN" remote register "$REMOTE_NAME" + + assert_output "remote cluster \"$REMOTE_NAME\" was registered" + assert_success + + run timeout 15s "$FLUVIO_BIN" remote register "$REMOTE_NAME_2" + + assert_output "remote cluster \"$REMOTE_NAME_2\" was registered" + assert_success +} + +@test "Can create a mirror topic" { + echo "[\"$REMOTE_NAME\"]" > remotes.json + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" --mirror-apply remotes.json --home-to-remote + + assert_output "topic \"$TOPIC_NAME\" created" + assert_success +} + +@test "Can add a new remote to the mirror topic" { + run timeout 15s "$FLUVIO_BIN" topic add-mirror "$TOPIC_NAME" "$REMOTE_NAME_2" --home-to-remote + + assert_output "added new mirror: \"$REMOTE_NAME_2\" to topic: \"$TOPIC_NAME\"" + assert_success +} + +@test "Can't add a non existent remote to the mirror topic" { + run timeout 15s "$FLUVIO_BIN" topic add-mirror "$TOPIC_NAME" "nonexistent-remote" --home-to-remote + + assert_output "Mirror not found" + assert_failure +} + +@test "Can't add a remote to the mirror topic that doesn't exist" { + run timeout 15s "$FLUVIO_BIN" topic add-mirror "nonexistent-topic" "$REMOTE_NAME" --home-to-remote + + assert_output "Topic not found" + assert_failure +} + +@test "Can't add a remote to the mirror topic that is already assigned" { + run timeout 15s "$FLUVIO_BIN" topic add-mirror "$TOPIC_NAME" "$REMOTE_NAME" --home-to-remote + + assert_output "remote \"$REMOTE_NAME\" is already assigned to partition: \"0\"" + assert_failure +} + + +@test "List topics" { + run bash -c 'timeout 15s "$FLUVIO_BIN" topic list | grep "$TOPIC_NAME"' + assert_success + assert_line --partial --index 0 "$TOPIC_NAME from-remote" +} + +@test "List partitions" { + run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"' + assert_success + assert_line --partial --index 0 "$TOPIC_NAME 0 5001 $REMOTE_NAME(from-remote)" +}