Skip to content

Commit

Permalink
Add the ability to opt-out from automatic partial events merging
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <mike-n@narod.ru>
  • Loading branch information
MOZGIII committed Mar 24, 2020
1 parent 9310817 commit 4a7a410
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions src/sources/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ enum BuildError {
IllegalCharacterInUid { uid: String },
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[derive(Deserialize, Serialize, Debug, Clone, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields, default)]
pub struct KubernetesConfig {
include_container_names: Vec<String>,
include_pod_uids: Vec<String>,
include_namespaces: Vec<String>,
#[derivative(Default(value = "false"))]
auto_partial_merge: bool,
}

#[typetag::serde(name = "kubernetes")]
Expand All @@ -72,15 +75,25 @@ impl SourceConfig for KubernetesConfig {
let mut transform_file = transform_file()?;
let mut transform_pod_uid = transform_pod_uid()?;
let mut parse_message = message_parser::build_message_parser()?;
let mut transform_merge_partial_events = transform_merge_partial_events();

// Kubernetes source
let source = file_recv
let stream = file_recv
.filter_map(move |event| transform_file.transform(event))
.filter_map(move |event| parse_message.transform(event))
.filter_map(move |event| now.filter(event))
.filter_map(move |event| transform_merge_partial_events.transform(event))
.filter_map(move |event| transform_pod_uid.transform(event))
.filter_map(move |event| now.filter(event));

let stream: Box<dyn Stream<Item = Event, Error = _> + Send> = if self.auto_partial_merge {
let mut transform_merge_partial_events = transform_merge_partial_events();
Box::new(
stream.filter_map(move |event| transform_merge_partial_events.transform(event)),
)
} else {
Box::new(stream)
};

let stream = stream.filter_map(move |event| transform_pod_uid.transform(event));

let source = stream
.forward(out.sink_map_err(drop))
.map(drop)
.join(file_source)
Expand Down

0 comments on commit 4a7a410

Please sign in to comment.