Skip to content

Commit

Permalink
test: update chekov messaging tests
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd committed Nov 11, 2023
1 parent d212b72 commit a536317
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ serde = { version = "1.0.117", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["chrono", "time", "uuid", "json", "offline", "runtime-actix-native-tls"] }
async-trait = "0.1.51"
serde_json = "1.0.68"
actix = "0.12.0"
actix = "0.13.0"
futures = "0.3.17"
log = "0.4.14"
tokio = { version = "1.12.0", features = ["full"] }
Expand Down
28 changes: 13 additions & 15 deletions crates/chekov/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct EventMetadatas {

#[doc(hidden)]
#[derive(Debug, Clone, Message)]
#[rtype(result = "Result(), ()>")]
#[rtype(result = "Result<(), ()>")]
pub struct EventEnvelope<E: Event> {
pub event: E,
pub meta: EventMetadatas,
Expand All @@ -65,55 +65,53 @@ pub struct ResolveAndApply(pub RecordedEvent);
pub struct ResolveAndApplyMany(pub Vec<RecordedEvent>);

#[derive(Message)]
#[rtype("Result<Vec<RecordedEvent>, event_store::prelude::EventStoreError>")]
#[rtype(result = "Result<Vec<RecordedEvent>, event_store::prelude::EventStoreError>")]
pub(crate) struct ExecuteReader(pub(crate) event_store::prelude::Reader);

#[derive(Message)]
#[rtype(
"Result<
#[rtype(result = "Result<
std::pin::Pin<
Box<dyn futures::Stream<Item = Result<Vec<RecordedEvent>, EventStoreError>> + Send>,
>,
EventStoreError,
>"
)]
>")]
pub(crate) struct ExecuteStreamForward(pub(crate) String);

#[derive(Message)]
#[rtype("Result<Vec<Uuid>, event_store::prelude::EventStoreError>")]
#[rtype(result = "Result<Vec<Uuid>, event_store::prelude::EventStoreError>")]
pub(crate) struct ExecuteAppender(pub(crate) event_store::prelude::Appender);

#[derive(Message)]
#[rtype("Result<event_store::prelude::Stream, event_store::prelude::EventStoreError>")]
#[rtype(result = "Result<event_store::prelude::Stream, event_store::prelude::EventStoreError>")]
pub(crate) struct ExecuteStreamInfo(pub(crate) String);

#[derive(Message)]
#[rtype("u64")]
#[rtype(result = "u64")]
pub(crate) struct AggregateVersion;

#[derive(Message)]
#[rtype("A")]
#[rtype(result = "A")]
pub(crate) struct AggregateState<A: Aggregate>(pub(crate) PhantomData<A>);

#[derive(Message, Debug)]
#[rtype("()")]
#[rtype(result = "()")]
pub(crate) struct StartListening;

#[derive(Message)]
#[rtype("Addr<event_store::EventStore<S>>")]
#[rtype(result = "Addr<event_store::EventStore<S>>")]
pub(crate) struct GetEventStoreAddr<S: Storage> {
pub(crate) _phantom: PhantomData<S>,
}

#[derive(Message)]
#[rtype("Option<Addr<crate::aggregate::AggregateInstance<A>>>")]
#[rtype(result = "Option<Addr<crate::aggregate::AggregateInstance<A>>>")]
pub(crate) struct GetAggregateAddr<A: Aggregate> {
pub(crate) identifier: String,
pub(crate) _phantom: PhantomData<A>,
}

#[derive(Message)]
#[rtype("Result<Addr<crate::aggregate::AggregateInstance<A>>, ()>")]
#[rtype(result = "Result<Addr<crate::aggregate::AggregateInstance<A>>, ()>")]
pub(crate) struct StartAggregate<A: Aggregate, APP: Application> {
pub(crate) identifier: String,
pub(crate) correlation_id: Option<Uuid>,
Expand All @@ -122,7 +120,7 @@ pub(crate) struct StartAggregate<A: Aggregate, APP: Application> {
}

#[derive(Message)]
#[rtype("Result<(), ()>")]
#[rtype(result = "Result<(), ()>")]
pub(crate) struct ShutdownAggregate {
pub(crate) identifier: String,
}
2 changes: 1 addition & 1 deletion crates/event_store-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait Event: Serialize + Send + std::convert::TryFrom<RecordedEvent> {

/// A `RecordedEvent` represents an `Event` which have been append to a `Stream`
#[derive(sqlx::FromRow, Debug, Clone, Message, Serialize)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct RecordedEvent {
/// an incrementing and gapless integer used to order the event in a stream.
#[sqlx(try_from = "i64")]
Expand Down
2 changes: 1 addition & 1 deletion crates/event_store-core/src/event_bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub type BoxedStream = Pin<
>;

#[derive(Debug, Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub enum EventBusMessage {
Notification(EventNotification),
Events(String, Vec<RecordedEvent>),
Expand Down
2 changes: 1 addition & 1 deletion crates/event_store/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod messaging;
pub use messaging::{Append, CreateStream, Read, StreamForward, StreamForwardResult, StreamInfo};

#[derive(Message)]
#[rtype("()")]
#[rtype(result = "()")]
// TODO: Remove this by a better subscription channel
pub struct OpenNotificationChannel {
pub(crate) sender: mpsc::UnboundedSender<EventBusMessage>,
Expand Down
2 changes: 1 addition & 1 deletion crates/event_store/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use crate::core::event::UnsavedEvent;
pub use crate::core::event::UnsavedEventError;

Check warning on line 6 in crates/event_store/src/event/mod.rs

View workflow job for this annotation

GitHub Actions / Coverage

unused import: `crate::core::event::UnsavedEventError`

#[derive(Debug, Clone, Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct RecordedEvents {
pub(crate) events: Vec<RecordedEvent>,
}
6 changes: 3 additions & 3 deletions crates/event_store/src/subscriptions/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl PubSub {
}

#[derive(Message)]
#[rtype("()")]
#[rtype(result = "()")]
struct Subscribe(Recipient<SubscriptionNotification>, String);

#[derive(Message)]
#[rtype("usize")]
#[rtype(result = "usize")]
struct GetSubscriberCountForStream(String);

#[derive(Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct PubSubNotification {
pub(crate) stream: String,
pub(crate) events: Vec<RecordedEvent>,
Expand Down
4 changes: 2 additions & 2 deletions crates/event_store/src/subscriptions/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use actix::prelude::*;
use tracing::debug;

#[derive(Debug, Message)]
#[rtype("()")]
#[rtype(result = "()")]
struct Connect(pub Recipient<SubscriptionNotification>, SubscriptionOptions);

#[derive(Debug, Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct CatchUp;

#[derive(Debug)]
Expand Down
6 changes: 3 additions & 3 deletions crates/event_store/src/subscriptions/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<S: Storage> Handler<CreateSubscription<S>> for SubscriptionsSupervisor<S> {
}

#[derive(Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct Started;

impl<S: Storage> Handler<Started> for SubscriptionsSupervisor<S> {
Expand All @@ -97,7 +97,7 @@ impl<S: Storage> Handler<Started> for SubscriptionsSupervisor<S> {
}

#[derive(Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct GoingDown;

impl<S: Storage> Handler<GoingDown> for SubscriptionsSupervisor<S> {
Expand All @@ -109,7 +109,7 @@ impl<S: Storage> Handler<GoingDown> for SubscriptionsSupervisor<S> {
}

#[derive(Message)]
#[rtype("()")]
#[rtype(result = "()")]
pub struct Down;

impl<S: Storage> Handler<Down> for SubscriptionsSupervisor<S> {
Expand Down

0 comments on commit a536317

Please sign in to comment.