diff --git a/src/main/java/ch/rasc/sse/eventbus/DefaultSubscriptionRegistry.java b/src/main/java/ch/rasc/sse/eventbus/DefaultSubscriptionRegistry.java index be2c593..dd2575f 100644 --- a/src/main/java/ch/rasc/sse/eventbus/DefaultSubscriptionRegistry.java +++ b/src/main/java/ch/rasc/sse/eventbus/DefaultSubscriptionRegistry.java @@ -50,8 +50,7 @@ public void subscribe(String clientId, String event) { */ @Override public void unsubscribe(String clientId, String event) { - this.eventSubscribers.computeIfPresent(event, - (k, set) -> set.remove(clientId) && set.isEmpty() ? null : set); + this.eventSubscribers.computeIfPresent(event, (k, set) -> set.remove(clientId) && set.isEmpty() ? null : set); } /* diff --git a/src/main/java/ch/rasc/sse/eventbus/JacksonDataObjectConverter.java b/src/main/java/ch/rasc/sse/eventbus/JacksonDataObjectConverter.java index 014e671..f2e8c3c 100644 --- a/src/main/java/ch/rasc/sse/eventbus/JacksonDataObjectConverter.java +++ b/src/main/java/ch/rasc/sse/eventbus/JacksonDataObjectConverter.java @@ -39,8 +39,7 @@ public String convert(SseEvent event) { return this.objectMapper.writeValueAsString(event.data()); } - return this.objectMapper.writerWithView(event.jsonView().get()) - .writeValueAsString(event.data()); + return this.objectMapper.writerWithView(event.jsonView().get()).writeValueAsString(event.data()); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/src/main/java/ch/rasc/sse/eventbus/SseEvent.java b/src/main/java/ch/rasc/sse/eventbus/SseEvent.java index b172935..179750d 100644 --- a/src/main/java/ch/rasc/sse/eventbus/SseEvent.java +++ b/src/main/java/ch/rasc/sse/eventbus/SseEvent.java @@ -22,8 +22,7 @@ import org.immutables.value.Value; import org.immutables.value.Value.Style.ImplementationVisibility; -@Value.Style(depluralize = true, visibility = ImplementationVisibility.PACKAGE, - overshadowImplementation = true) +@Value.Style(depluralize = true, visibility = ImplementationVisibility.PACKAGE, overshadowImplementation = true) @Value.Immutable public interface SseEvent { diff --git a/src/main/java/ch/rasc/sse/eventbus/SseEventBus.java b/src/main/java/ch/rasc/sse/eventbus/SseEventBus.java index b2464b7..3af0069 100644 --- a/src/main/java/ch/rasc/sse/eventbus/SseEventBus.java +++ b/src/main/java/ch/rasc/sse/eventbus/SseEventBus.java @@ -52,8 +52,7 @@ public class SseEventBus { private final SseEventBusListener listener; - public SseEventBus(SseEventBusConfigurer configurer, - SubscriptionRegistry subscriptionRegistry) { + public SseEventBus(SseEventBusConfigurer configurer, SubscriptionRegistry subscriptionRegistry) { this.subscriptionRegistry = subscriptionRegistry; @@ -71,9 +70,9 @@ public SseEventBus(SseEventBusConfigurer configurer, if (this.taskScheduler != null) { this.taskScheduler.submit(this::eventLoop); this.taskScheduler.scheduleWithFixedDelay(this::reScheduleFailedEvents, 0, - configurer.schedulerDelay().toMillis(), TimeUnit.MILLISECONDS); + configurer.schedulerDelay().toMillis(), TimeUnit.MILLISECONDS); this.taskScheduler.scheduleWithFixedDelay(this::cleanUpClients, 0, - configurer.clientExpirationJobDelay().toMillis(), TimeUnit.MILLISECONDS); + configurer.clientExpirationJobDelay().toMillis(), TimeUnit.MILLISECONDS); } } @@ -91,8 +90,7 @@ public SseEmitter createSseEmitter(String clientId, String... events) { return createSseEmitter(clientId, 180_000L, false, false, events); } - public SseEmitter createSseEmitter(String clientId, boolean unsubscribe, - String... events) { + public SseEmitter createSseEmitter(String clientId, boolean unsubscribe, String... events) { return createSseEmitter(clientId, 180_000L, unsubscribe, false, events); } @@ -100,8 +98,7 @@ public SseEmitter createSseEmitter(String clientId, Long timeout, String... even return createSseEmitter(clientId, timeout, false, false, events); } - public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe, - String... events) { + public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe, String... events) { return createSseEmitter(clientId, timeout, unsubscribe, false, events); } @@ -115,8 +112,8 @@ public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubs * @param events events the client wants to subscribe * @return a new SseEmitter instance */ - public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe, - boolean completeAfterMessage, String... events) { + public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe, boolean completeAfterMessage, + String... events) { SseEmitter emitter = new SseEmitter(timeout); emitter.onTimeout(emitter::complete); registerClient(clientId, emitter, completeAfterMessage); @@ -137,12 +134,10 @@ public void registerClient(String clientId, SseEmitter emitter) { this.registerClient(clientId, emitter, false); } - public void registerClient(String clientId, SseEmitter emitter, - boolean completeAfterMessage) { + public void registerClient(String clientId, SseEmitter emitter, boolean completeAfterMessage) { Client client = this.clients.get(clientId); if (client == null) { - this.clients.put(clientId, - new Client(clientId, emitter, completeAfterMessage)); + this.clients.put(clientId, new Client(clientId, emitter, completeAfterMessage)); } else { client.updateEmitter(emitter); @@ -209,10 +204,8 @@ public void handleEvent(SseEvent event) { if (event.clientIds().isEmpty()) { for (Client client : this.clients.values()) { if (!event.excludeClientIds().contains(client.getId()) - && this.subscriptionRegistry.isClientSubscribedToEvent( - client.getId(), event.event())) { - ClientEvent clientEvent = new ClientEvent(client, event, - convertedValue); + && this.subscriptionRegistry.isClientSubscribedToEvent(client.getId(), event.event())) { + ClientEvent clientEvent = new ClientEvent(client, event, convertedValue); this.sendQueue.put(clientEvent); this.listener.afterEventQueued(clientEvent, true); } @@ -220,10 +213,8 @@ public void handleEvent(SseEvent event) { } else { for (String clientId : event.clientIds()) { - if (this.subscriptionRegistry.isClientSubscribedToEvent(clientId, - event.event())) { - ClientEvent clientEvent = new ClientEvent( - this.clients.get(clientId), event, convertedValue); + if (this.subscriptionRegistry.isClientSubscribedToEvent(clientId, event.event())) { + ClientEvent clientEvent = new ClientEvent(this.clients.get(clientId), event, convertedValue); this.sendQueue.put(clientEvent); this.listener.afterEventQueued(clientEvent, true); } @@ -241,8 +232,7 @@ private void reScheduleFailedEvents() { this.errorQueue.drainTo(failedEvents); for (ClientEvent sseClientEvent : failedEvents) { - if (this.subscriptionRegistry.isClientSubscribedToEvent( - sseClientEvent.getClient().getId(), + if (this.subscriptionRegistry.isClientSubscribedToEvent(sseClientEvent.getClient().getId(), sseClientEvent.getSseEvent().event())) { try { this.sendQueue.put(sseClientEvent); @@ -250,16 +240,14 @@ private void reScheduleFailedEvents() { this.listener.afterEventQueued(sseClientEvent, false); } catch (Exception e) { - LogFactory.getLog(SseEventBus.class) - .error("calling afterEventQueued hook failed", e); + LogFactory.getLog(SseEventBus.class).error("calling afterEventQueued hook failed", e); } } catch (InterruptedException ie) { throw new RuntimeException(ie); } catch (Exception e) { - LogFactory.getLog(SseEventBus.class) - .error("re-adding event into send queue failed", e); + LogFactory.getLog(SseEventBus.class).error("re-adding event into send queue failed", e); try { this.errorQueue.put(sseClientEvent); } @@ -271,8 +259,7 @@ private void reScheduleFailedEvents() { } } catch (Exception e) { - LogFactory.getLog(SseEventBus.class).error("reScheduleFailedEvents failed", - e); + LogFactory.getLog(SseEventBus.class).error("reScheduleFailedEvents failed", e); } } @@ -289,8 +276,7 @@ private void eventLoop() { this.listener.afterEventSent(clientEvent, null); } catch (Exception ex) { - LogFactory.getLog(SseEventBus.class) - .error("calling afterEventSent hook failed", ex); + LogFactory.getLog(SseEventBus.class).error("calling afterEventSent hook failed", ex); } } else { @@ -305,8 +291,7 @@ private void eventLoop() { this.listener.afterEventSent(clientEvent, e); } catch (Exception ex) { - LogFactory.getLog(SseEventBus.class) - .error("calling afterEventSent hook failed", ex); + LogFactory.getLog(SseEventBus.class).error("calling afterEventSent hook failed", ex); } } } @@ -314,12 +299,10 @@ private void eventLoop() { String clientId = clientEvent.getClient().getId(); this.unregisterClient(clientId); try { - this.listener.afterClientsUnregistered( - Collections.singleton(clientId)); + this.listener.afterClientsUnregistered(Collections.singleton(clientId)); } catch (Exception ex) { - LogFactory.getLog(SseEventBus.class).error( - "calling afterClientsUnregistered hook failed", ex); + LogFactory.getLog(SseEventBus.class).error("calling afterClientsUnregistered hook failed", ex); } } } @@ -360,8 +343,7 @@ private String convertObject(SseEvent event) { private void cleanUpClients() { if (!this.clients.isEmpty()) { - long expirationTime = System.currentTimeMillis() - - this.clientExpiration.toMillis(); + long expirationTime = System.currentTimeMillis() - this.clientExpiration.toMillis(); Iterator> it = this.clients.entrySet().iterator(); Set staleClients = new HashSet<>(); while (it.hasNext()) { diff --git a/src/main/java/ch/rasc/sse/eventbus/SseEventBusListener.java b/src/main/java/ch/rasc/sse/eventbus/SseEventBusListener.java index e807bc2..acd86e4 100644 --- a/src/main/java/ch/rasc/sse/eventbus/SseEventBusListener.java +++ b/src/main/java/ch/rasc/sse/eventbus/SseEventBusListener.java @@ -50,4 +50,5 @@ default void afterEventSent(ClientEvent clientEvent, Exception exception) { default void afterClientsUnregistered(Set clientIds) { // no default implementation } + } diff --git a/src/main/java/ch/rasc/sse/eventbus/config/SseEventBusConfigurer.java b/src/main/java/ch/rasc/sse/eventbus/config/SseEventBusConfigurer.java index 79a749b..d1155e5 100644 --- a/src/main/java/ch/rasc/sse/eventbus/config/SseEventBusConfigurer.java +++ b/src/main/java/ch/rasc/sse/eventbus/config/SseEventBusConfigurer.java @@ -86,7 +86,9 @@ default BlockingQueue sendQueue() { return new LinkedBlockingQueue<>(); } - default ConcurrentMap clients() {return new ConcurrentHashMap<>();} + default ConcurrentMap clients() { + return new ConcurrentHashMap<>(); + } default SseEventBusListener listener() { return new SseEventBusListener() { diff --git a/src/test/java/ch/rasc/sse/eventbus/IntegrationTest.java b/src/test/java/ch/rasc/sse/eventbus/IntegrationTest.java index 32c4dc2..6d2dd47 100644 --- a/src/test/java/ch/rasc/sse/eventbus/IntegrationTest.java +++ b/src/test/java/ch/rasc/sse/eventbus/IntegrationTest.java @@ -46,8 +46,7 @@ import okhttp3.ResponseBody; @SuppressWarnings("resource") -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, - classes = TestDefaultConfiguration.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = TestDefaultConfiguration.class) public class IntegrationTest { @LocalServerPort @@ -104,14 +103,12 @@ public void testRegisterAndSubscribeOnly() throws IOException, InterruptedExcept assertThat(this.eventBus.hasSubscribers("event1")).isTrue(); assertThat(this.eventBus.getSubscribers("event2")).containsOnly("1"); assertThat(this.eventBus.countSubscribers("event1")).isEqualTo(1); - assertThat(this.eventBus.getAllSubscriptions()).containsOnlyKeys("event1", - "event2"); + assertThat(this.eventBus.getAllSubscriptions()).containsOnlyKeys("event1", "event2"); this.eventPublisher.publishEvent(SseEvent.of("event1", "payload1")); this.eventPublisher.publishEvent(SseEvent.of("event2", "payload2")); - assertSseResponse(sseResponse2, new ResponseData("event1", "payload1"), - new ResponseData("event2", "payload2")); + assertSseResponse(sseResponse2, new ResponseData("event1", "payload1"), new ResponseData("event2", "payload2")); SubscribeResponse sseResponse = registerAndSubscribeOnly("1", "event3", 1); this.eventPublisher.publishEvent(SseEvent.of("event1", "payload1")); @@ -135,8 +132,13 @@ public void testOneClientOneEventEmptyData() throws IOException { @Test public void testOneClientOneEventAdditionalInfo() throws IOException { SubscribeResponse sseResponse = registerSubscribe("1", "eventName"); - SseEvent sseEvent = SseEvent.builder().event("eventName").data("the data line") - .id("123").retry(Duration.ofMillis(1000L)).comment("the comment").build(); + SseEvent sseEvent = SseEvent.builder() + .event("eventName") + .data("the data line") + .id("123") + .retry(Duration.ofMillis(1000L)) + .comment("the comment") + .build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse, new ResponseData("eventName", "the data line")); // "id:123", "retry:1000", @@ -147,8 +149,7 @@ public void testOneClientOneEventAdditionalInfo() throws IOException { @Test public void testOneClientOneDirectEvent() throws IOException { SubscribeResponse sseResponse = registerSubscribe("1", "eventName"); - SseEvent sseEvent = SseEvent.builder().addClientId("1").event("eventName") - .data("payload").build(); + SseEvent sseEvent = SseEvent.builder().addClientId("1").event("eventName").data("payload").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse, new ResponseData("eventName", "payload")); sseResponse.eventSource().close(); @@ -165,8 +166,7 @@ public void testOneClientNoEvent() throws IOException { @Test public void testOneClientOneDirectEventToSomebodyElse() throws IOException { SubscribeResponse sseResponse = registerSubscribe("1", "eventName"); - SseEvent sseEvent = SseEvent.builder().addClientId("2").event("eventName") - .data("payload").build(); + SseEvent sseEvent = SseEvent.builder().addClientId("2").event("eventName").data("payload").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse); sseResponse.eventSource().close(); @@ -174,8 +174,7 @@ public void testOneClientOneDirectEventToSomebodyElse() throws IOException { @Test public void testOneClientTwoEvents() throws IOException { - SubscribeResponse sseResponse = registerSubscribe("1", "eventName", false, 2, - true); + SubscribeResponse sseResponse = registerSubscribe("1", "eventName", false, 2, true); this.eventPublisher.publishEvent(SseEvent.of("eventName", "payload1")); this.eventPublisher.publishEvent(SseEvent.of("eventName", "payload2")); assertSseResponse(sseResponse, new ResponseData("eventName", "payload1"), @@ -185,14 +184,11 @@ public void testOneClientTwoEvents() throws IOException { @Test public void testOneClientTwoDirectEvents() throws IOException { - SubscribeResponse sseResponse = registerSubscribe("1", "eventName", false, 2, - true); + SubscribeResponse sseResponse = registerSubscribe("1", "eventName", false, 2, true); - SseEvent sseEvent = SseEvent.builder().addClientId("1").event("eventName") - .data("payload1").build(); + SseEvent sseEvent = SseEvent.builder().addClientId("1").event("eventName").data("payload1").build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addClientId("1").event("eventName").data("payload2") - .build(); + sseEvent = SseEvent.builder().addClientId("1").event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse, new ResponseData("eventName", "payload1"), @@ -202,16 +198,13 @@ public void testOneClientTwoDirectEvents() throws IOException { } @Test - public void testOneClientOneDirectEventToHimAndOneToSomebodyElse() - throws IOException { + public void testOneClientOneDirectEventToHimAndOneToSomebodyElse() throws IOException { SubscribeResponse sseResponse = registerSubscribe("1", "eventName"); - SseEvent sseEvent = SseEvent.builder().addClientId("1").event("eventName") - .data("payload1").build(); + SseEvent sseEvent = SseEvent.builder().addClientId("1").event("eventName").data("payload1").build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addClientId("2").event("eventName").data("payload2") - .build(); + sseEvent = SseEvent.builder().addClientId("2").event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse, new ResponseData("eventName", "payload1")); @@ -232,10 +225,8 @@ public void testTwoClientsOneAllEvent() throws IOException { @Test public void testTwoClientsTwoAllEvent() throws IOException { - SubscribeResponse sseResponse1 = registerSubscribe("1", "eventName", false, 2, - true); - SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName", false, 2, - true); + SubscribeResponse sseResponse1 = registerSubscribe("1", "eventName", false, 2, true); + SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName", false, 2, true); this.eventPublisher.publishEvent(SseEvent.of("eventName", "payload1")); this.eventPublisher.publishEvent(SseEvent.of("eventName", "payload2")); assertSseResponse(sseResponse1, new ResponseData("eventName", "payload1"), @@ -252,12 +243,10 @@ public void testTwoClientsTwoDirectEventToOneOfThem() throws IOException { SubscribeResponse sseResponse1 = registerSubscribe("1", "eventName"); SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName"); - SseEvent sseEvent = SseEvent.builder().addClientId("2").event("eventName") - .data("payload1").build(); + SseEvent sseEvent = SseEvent.builder().addClientId("2").event("eventName").data("payload1").build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addClientId("2").event("eventName").data("payload2") - .build(); + sseEvent = SseEvent.builder().addClientId("2").event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse1); @@ -274,11 +263,9 @@ public void testThreeClientsGroupEventToTwoOfThem() throws IOException { SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName"); SubscribeResponse sseResponse3 = registerSubscribe("3", "eventName"); - SseEvent sseEvent = SseEvent.builder().addClientIds("2", "3").event("eventName") - .data("payload1").build(); + SseEvent sseEvent = SseEvent.builder().addClientIds("2", "3").event("eventName").data("payload1").build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addClientIds("2", "3").event("eventName") - .data("payload2").build(); + sseEvent = SseEvent.builder().addClientIds("2", "3").event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse1); assertSseResponse(sseResponse2, new ResponseData("eventName", "payload1"), @@ -297,13 +284,19 @@ public void testThreeClientsGroupEventToTwoOfThemIgnoreExclude() throws IOExcept SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName"); SubscribeResponse sseResponse3 = registerSubscribe("3", "eventName"); - SseEvent sseEvent = SseEvent.builder().addClientIds("2", "3") - .addExcludeClientIds("2", "1").event("eventName").data("payload1") - .build(); + SseEvent sseEvent = SseEvent.builder() + .addClientIds("2", "3") + .addExcludeClientIds("2", "1") + .event("eventName") + .data("payload1") + .build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addClientIds("2", "3") - .addExcludeClientIds("3", "2", "1").event("eventName").data("payload2") - .build(); + sseEvent = SseEvent.builder() + .addClientIds("2", "3") + .addExcludeClientIds("3", "2", "1") + .event("eventName") + .data("payload2") + .build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse1); assertSseResponse(sseResponse2, new ResponseData("eventName", "payload1"), @@ -322,11 +315,9 @@ public void testThreeClientsSendExcludeOne() throws IOException { SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName"); SubscribeResponse sseResponse3 = registerSubscribe("3", "eventName", 2); - SseEvent sseEvent = SseEvent.builder().addExcludeClientId("2").event("eventName") - .data("payload1").build(); + SseEvent sseEvent = SseEvent.builder().addExcludeClientId("2").event("eventName").data("payload1").build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addExcludeClientId("1").event("eventName") - .data("payload2").build(); + sseEvent = SseEvent.builder().addExcludeClientId("1").event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse1, new ResponseData("eventName", "payload1")); assertSseResponse(sseResponse2, new ResponseData("eventName", "payload2")); @@ -344,11 +335,13 @@ public void testThreeClientsSendExcludeMultiple() throws IOException { SubscribeResponse sseResponse2 = registerSubscribe("2", "eventName"); SubscribeResponse sseResponse3 = registerSubscribe("3", "eventName"); - SseEvent sseEvent = SseEvent.builder().addExcludeClientIds("2", "3") - .event("eventName").data("payload1").build(); + SseEvent sseEvent = SseEvent.builder() + .addExcludeClientIds("2", "3") + .event("eventName") + .data("payload1") + .build(); this.eventPublisher.publishEvent(sseEvent); - sseEvent = SseEvent.builder().addExcludeClientIds("1", "3").event("eventName") - .data("payload2").build(); + sseEvent = SseEvent.builder().addExcludeClientIds("1", "3").event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); assertSseResponse(sseResponse1, new ResponseData("eventName", "payload1")); assertSseResponse(sseResponse2, new ResponseData("eventName", "payload2")); @@ -378,8 +371,7 @@ public void testMultipleSubscriptions() throws IOException { @Test @Disabled public void testReconnect() throws IOException { - SubscribeResponse sseResponse = registerSubscribe("1", "eventName", true, 1, - true); + SubscribeResponse sseResponse = registerSubscribe("1", "eventName", true, 1, true); sleep(2, TimeUnit.SECONDS); // assertSseResponseWithException(sseResponse); sleep(2, TimeUnit.SECONDS); @@ -390,8 +382,7 @@ public void testReconnect() throws IOException { assertThat(this.eventBus.countSubscribers("eventName")).isEqualTo(1); assertThat(this.eventBus.getAllSubscriptions()).containsOnlyKeys("eventName"); - SseEvent sseEvent = SseEvent.builder().event("eventName").data("payload1") - .build(); + SseEvent sseEvent = SseEvent.builder().event("eventName").data("payload1").build(); this.eventPublisher.publishEvent(sseEvent); sseEvent = SseEvent.builder().event("eventName").data("payload2").build(); this.eventPublisher.publishEvent(sseEvent); @@ -400,8 +391,7 @@ public void testReconnect() throws IOException { sseResponse = registerSubscribe("1", "eventName", 3); assertSseResponse(sseResponse, new ResponseData("eventName", "payload1"), - new ResponseData("eventName", "payload2"), - new ResponseData("eventName", "payload3")); + new ResponseData("eventName", "payload2"), new ResponseData("eventName", "payload3")); assertThat(this.eventBus.getAllClientIds()).hasSize(1); assertThat(this.eventBus.getAllEvents()).containsOnly("eventName"); assertThat(this.eventBus.hasSubscribers("eventName")).isTrue(); @@ -418,8 +408,7 @@ public void testReconnect() throws IOException { sseResponse = registerSubscribe("1", "eventName", 3); assertSseResponse(sseResponse, new ResponseData("eventName", "payload4"), - new ResponseData("eventName", "payload5"), - new ResponseData("eventName", "payload6")); + new ResponseData("eventName", "payload5"), new ResponseData("eventName", "payload6")); assertThat(this.eventBus.getAllClientIds()).hasSize(1); assertThat(this.eventBus.getAllEvents()).containsOnly("eventName"); assertThat(this.eventBus.hasSubscribers("eventName")).isTrue(); @@ -465,12 +454,10 @@ public void testClientExpiration() throws IOException { public void testMany() throws IOException { List responses = new ArrayList<>(); for (int i = 0; i < 100; i++) { - responses.add( - registerSubscribe(String.valueOf(i), "eventName", false, 1, false)); + responses.add(registerSubscribe(String.valueOf(i), "eventName", false, 1, false)); } for (int i = 100; i < 120; i++) { - responses.add( - registerSubscribe(String.valueOf(i), "eventName", true, 1, false)); + responses.add(registerSubscribe(String.valueOf(i), "eventName", true, 1, false)); } sleep(3, TimeUnit.SECONDS); @@ -505,8 +492,7 @@ public void testJsonConverter() throws IOException { TestObject1 to1 = new TestObject1(101L, "john doe"); this.eventPublisher.publishEvent(SseEvent.of("to1", to1)); - assertSseResponse(sseResponse, - new ResponseData("to1", "{\"id\":101,\"name\":\"john doe\"}")); + assertSseResponse(sseResponse, new ResponseData("to1", "{\"id\":101,\"name\":\"john doe\"}")); sseResponse.eventSource().close(); } @@ -538,19 +524,18 @@ public void testJsonViewNoView() throws IOException { } @Test - public void testJsonViewPublicView() - throws IOException, InterruptedException, ExecutionException { + public void testJsonViewPublicView() throws IOException, InterruptedException, ExecutionException { SubscribeResponse sseResponse = registerSubscribe("1", "jsonView1"); TestObject3 to3 = new TestObject3(); to3.setPrivateData(23); to3.setPublicInfo("this is public"); to3.setUuid("abc"); - this.eventPublisher.publishEvent(SseEvent.builder().event("jsonView1").data(to3) - .jsonView(JsonViews.PUBLIC.class).build()); + this.eventPublisher + .publishEvent(SseEvent.builder().event("jsonView1").data(to3).jsonView(JsonViews.PUBLIC.class).build()); - assertSseResponse(sseResponse, new ResponseData("jsonView1", - "{\"uuid\":\"abc\",\"publicInfo\":\"this is public\"}")); + assertSseResponse(sseResponse, + new ResponseData("jsonView1", "{\"uuid\":\"abc\",\"publicInfo\":\"this is public\"}")); sseResponse.eventSource().close(); } @@ -563,8 +548,8 @@ public void testJsonViewPrivateView() throws IOException { to3.setPublicInfo("this is public"); to3.setUuid("abc"); - this.eventPublisher.publishEvent(SseEvent.builder().event("jsonView1").data(to3) - .jsonView(JsonViews.PRIVATE.class).build()); + this.eventPublisher + .publishEvent(SseEvent.builder().event("jsonView1").data(to3).jsonView(JsonViews.PRIVATE.class).build()); assertSseResponse(sseResponse, new ResponseData("jsonView1", "{\"uuid\":\"abc\",\"publicInfo\":\"this is public\",\"privateData\":23}")); @@ -590,7 +575,9 @@ private static OkHttpClient createHttpClient() { private static OkHttpClient createHttpClient(long timeout, TimeUnit timeUnit) { return new OkHttpClient.Builder().connectTimeout(timeout, timeUnit) - .writeTimeout(timeout, timeUnit).readTimeout(timeout, timeUnit).build(); + .writeTimeout(timeout, timeUnit) + .readTimeout(timeout, timeUnit) + .build(); } private static void assertSseResponseWithException(Response response) { @@ -610,8 +597,7 @@ private static void assertSseResponseWithException(Response response) { } } - private static void assertSseResponse(SubscribeResponse response, - ResponseData... expected) { + private static void assertSseResponse(SubscribeResponse response, ResponseData... expected) { try { List rds; try { @@ -632,29 +618,26 @@ private static void assertSseResponse(SubscribeResponse response, } } - private SubscribeResponse registerSubscribe(String clientId, String eventName) - throws IOException { + private SubscribeResponse registerSubscribe(String clientId, String eventName) throws IOException { return registerSubscribe(clientId, eventName, false, 1, true); } - private SubscribeResponse registerSubscribe(String clientId, String eventName, - int expectedNoOfData) throws IOException { + private SubscribeResponse registerSubscribe(String clientId, String eventName, int expectedNoOfData) + throws IOException { return registerSubscribe(clientId, eventName, false, expectedNoOfData, true); } - private SubscribeResponse registerSubscribe(String clientId, String eventName, - boolean shortTimeout, int expectedNoOfData, boolean sleep) - throws IOException { + private SubscribeResponse registerSubscribe(String clientId, String eventName, boolean shortTimeout, + int expectedNoOfData, boolean sleep) throws IOException { CompletableFuture> dataFuture = new CompletableFuture<>(); List responses = new ArrayList<>(); - EventSource.Builder builder = new EventSource.Builder( - (DefaultEventHandler) (event, messageEvent) -> { - responses.add(new ResponseData(event, messageEvent.getData())); - if (responses.size() == expectedNoOfData) { - dataFuture.complete(responses); - } - }, URI.create(testUrl("/register/" + clientId))); + EventSource.Builder builder = new EventSource.Builder((DefaultEventHandler) (event, messageEvent) -> { + responses.add(new ResponseData(event, messageEvent.getData())); + if (responses.size() == expectedNoOfData) { + dataFuture.complete(responses); + } + }, URI.create(testUrl("/register/" + clientId))); EventSource eventSource = builder.build(); eventSource.start(); @@ -665,9 +648,8 @@ private SubscribeResponse registerSubscribe(String clientId, String eventName, else { client = createHttpClient(); } - client.newCall(new Request.Builder().get() - .url(testUrl("/subscribe/" + clientId + "/" + eventName)).build()) - .execute(); + client.newCall(new Request.Builder().get().url(testUrl("/subscribe/" + clientId + "/" + eventName)).build()) + .execute(); if (sleep) { sleep(333, TimeUnit.MILLISECONDS); @@ -678,39 +660,34 @@ private SubscribeResponse registerSubscribe(String clientId, String eventName, private void subscribe(String clientId, String eventName) throws IOException { OkHttpClient client = createHttpClient(); - client.newCall(new Request.Builder().get() - .url(testUrl("/subscribe/" + clientId + "/" + eventName)).build()) - .execute(); + client.newCall(new Request.Builder().get().url(testUrl("/subscribe/" + clientId + "/" + eventName)).build()) + .execute(); } - private SubscribeResponse registerAndSubscribe(String clientId, String eventName, - int expectedNoOfData) { + private SubscribeResponse registerAndSubscribe(String clientId, String eventName, int expectedNoOfData) { CompletableFuture> dataFuture = new CompletableFuture<>(); List responses = new ArrayList<>(); - EventSource.Builder builder = new EventSource.Builder( - (DefaultEventHandler) (event, messageEvent) -> { - responses.add(new ResponseData(event, messageEvent.getData())); - if (responses.size() == expectedNoOfData) { - dataFuture.complete(responses); - } - }, URI.create(testUrl("/register/" + clientId + "/" + eventName))); + EventSource.Builder builder = new EventSource.Builder((DefaultEventHandler) (event, messageEvent) -> { + responses.add(new ResponseData(event, messageEvent.getData())); + if (responses.size() == expectedNoOfData) { + dataFuture.complete(responses); + } + }, URI.create(testUrl("/register/" + clientId + "/" + eventName))); EventSource eventSource = builder.build(); eventSource.start(); return new SubscribeResponse(eventSource, dataFuture); } - private SubscribeResponse registerAndSubscribeOnly(String clientId, String eventName, - int expectedNoOfData) { + private SubscribeResponse registerAndSubscribeOnly(String clientId, String eventName, int expectedNoOfData) { CompletableFuture> dataFuture = new CompletableFuture<>(); List responses = new ArrayList<>(); - EventSource.Builder builder = new EventSource.Builder( - (DefaultEventHandler) (event, messageEvent) -> { - responses.add(new ResponseData(event, messageEvent.getData())); - if (responses.size() == expectedNoOfData) { - dataFuture.complete(responses); - } - }, URI.create(testUrl("/registerOnly/" + clientId + "/" + eventName))); + EventSource.Builder builder = new EventSource.Builder((DefaultEventHandler) (event, messageEvent) -> { + responses.add(new ResponseData(event, messageEvent.getData())); + if (responses.size() == expectedNoOfData) { + dataFuture.complete(responses); + } + }, URI.create(testUrl("/registerOnly/" + clientId + "/" + eventName))); EventSource eventSource = builder.connectTimeout(10, TimeUnit.SECONDS).build(); eventSource.start(); @@ -726,10 +703,10 @@ private static void sleep(long value, TimeUnit timeUnit) { } } - static record SubscribeResponse(EventSource eventSource, - CompletableFuture> dataFuture) { + static record SubscribeResponse(EventSource eventSource, CompletableFuture> dataFuture) { } static record ResponseData(String event, String data) { } + } diff --git a/src/test/java/ch/rasc/sse/eventbus/ListenerTest.java b/src/test/java/ch/rasc/sse/eventbus/ListenerTest.java index 26560e6..ab72f86 100644 --- a/src/test/java/ch/rasc/sse/eventbus/ListenerTest.java +++ b/src/test/java/ch/rasc/sse/eventbus/ListenerTest.java @@ -46,8 +46,7 @@ import okhttp3.ResponseBody; @SuppressWarnings("resource") -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, - classes = TestDefaultConfiguration.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = TestDefaultConfiguration.class) public class ListenerTest { @LocalServerPort @@ -131,8 +130,7 @@ public void testReconnect() throws IOException { // assertSseResponseWithException(sseResponse); // sleep(2, TimeUnit.SECONDS); - SseEvent sseEvent = SseEvent.builder().event("eventName").data("payload1") - .build(); + SseEvent sseEvent = SseEvent.builder().event("eventName").data("payload1").build(); this.eventBus.handleEvent(sseEvent); sseEvent = SseEvent.builder().event("eventName").data("payload2").build(); this.eventBus.handleEvent(sseEvent); @@ -143,8 +141,7 @@ public void testReconnect() throws IOException { sseResponse = registerSubscribe("1", "eventName", 3); assertSseResponse(sseResponse, new ResponseData("eventName", "payload1"), - new ResponseData("eventName", "payload2"), - new ResponseData("eventName", "payload3")); + new ResponseData("eventName", "payload2"), new ResponseData("eventName", "payload3")); assertThat(this.testListener.getAfterEventQueuedFirst()).hasSize(3); assertThat(this.testListener.getAfterEventSentOk()).hasSize(3); @@ -165,11 +162,12 @@ private static OkHttpClient createHttpClient() { private static OkHttpClient createHttpClient(long timeout, TimeUnit timeUnit) { return new OkHttpClient.Builder().connectTimeout(timeout, timeUnit) - .writeTimeout(timeout, timeUnit).readTimeout(timeout, timeUnit).build(); + .writeTimeout(timeout, timeUnit) + .readTimeout(timeout, timeUnit) + .build(); } - private static void assertSseResponse(SubscribeResponse response, - ResponseData... expected) { + private static void assertSseResponse(SubscribeResponse response, ResponseData... expected) { try { List rds; try { @@ -207,23 +205,22 @@ private static void assertSseResponseWithException(Response response) { } } - private SubscribeResponse registerSubscribe(String clientId, String eventName, - int expectedNoOfData) throws IOException { + private SubscribeResponse registerSubscribe(String clientId, String eventName, int expectedNoOfData) + throws IOException { return registerSubscribe(clientId, eventName, false, expectedNoOfData); } - private SubscribeResponse registerSubscribe(String clientId, String eventName, - boolean shortTimeout, int expectedNoOfData) throws IOException { + private SubscribeResponse registerSubscribe(String clientId, String eventName, boolean shortTimeout, + int expectedNoOfData) throws IOException { CompletableFuture> dataFuture = new CompletableFuture<>(); List responses = new ArrayList<>(); - EventSource.Builder builder = new EventSource.Builder( - (DefaultEventHandler) (event, messageEvent) -> { - responses.add(new ResponseData(event, messageEvent.getData())); - if (responses.size() == expectedNoOfData) { - dataFuture.complete(responses); - } - }, URI.create(testUrl("/register/" + clientId))); + EventSource.Builder builder = new EventSource.Builder((DefaultEventHandler) (event, messageEvent) -> { + responses.add(new ResponseData(event, messageEvent.getData())); + if (responses.size() == expectedNoOfData) { + dataFuture.complete(responses); + } + }, URI.create(testUrl("/register/" + clientId))); EventSource eventSource = builder.build(); eventSource.start(); @@ -234,9 +231,8 @@ private SubscribeResponse registerSubscribe(String clientId, String eventName, else { client = createHttpClient(); } - client.newCall(new Request.Builder().get() - .url(testUrl("/subscribe/" + clientId + "/" + eventName)).build()) - .execute(); + client.newCall(new Request.Builder().get().url(testUrl("/subscribe/" + clientId + "/" + eventName)).build()) + .execute(); sleep(333, TimeUnit.MILLISECONDS); return new SubscribeResponse(eventSource, dataFuture); } diff --git a/src/test/java/ch/rasc/sse/eventbus/SseEventBusSchedulerTest.java b/src/test/java/ch/rasc/sse/eventbus/SseEventBusSchedulerTest.java index 4b6abf9..3f72163 100644 --- a/src/test/java/ch/rasc/sse/eventbus/SseEventBusSchedulerTest.java +++ b/src/test/java/ch/rasc/sse/eventbus/SseEventBusSchedulerTest.java @@ -65,6 +65,7 @@ public ConcurrentMap clients() { public ScheduledExecutorService taskScheduler() { return null; } + } @Autowired @@ -101,4 +102,5 @@ public void testClientRegistrationShouldNotExpireIfSchedulerIsNull() { assertThat(this.eventBus.getAllEvents()).isEmpty(); assertThat(this.eventBus.getAllSubscriptions()).isEmpty(); } + } diff --git a/src/test/java/ch/rasc/sse/eventbus/SseEventBusTest.java b/src/test/java/ch/rasc/sse/eventbus/SseEventBusTest.java index f2f7cbe..cd984a1 100644 --- a/src/test/java/ch/rasc/sse/eventbus/SseEventBusTest.java +++ b/src/test/java/ch/rasc/sse/eventbus/SseEventBusTest.java @@ -45,7 +45,6 @@ public class SseEventBusTest { @EnableSseEventBus static class Config implements SseEventBusConfigurer { - @Override public Duration clientExpiration() { return Duration.ofSeconds(5); @@ -60,6 +59,7 @@ public int noOfSendResponseTries() { public ConcurrentMap clients() { return CLIENTS_MAP; } + } @Autowired @@ -137,16 +137,14 @@ public void testClientRegisterAndSubscribe() { assertThat(CLIENTS_MAP.get("2").sseEmitter()).isEqualTo(se2); assertThat(CLIENTS_MAP.get("3").sseEmitter()).isEqualTo(se3); - assertThat(this.eventBus.getAllEvents()).containsOnly("one", "two", "two2", - "three"); + assertThat(this.eventBus.getAllEvents()).containsOnly("one", "two", "two2", "three"); assertThat(this.eventBus.getSubscribers("one")).containsExactly("1", "3"); assertThat(this.eventBus.getSubscribers("two")).containsExactly("2"); assertThat(this.eventBus.getSubscribers("two2")).containsExactly("2"); assertThat(this.eventBus.getSubscribers("three")).containsExactly("3"); this.eventBus.unsubscribe("1", "x"); - assertThat(this.eventBus.getAllEvents()).containsOnly("one", "two", "two2", - "three"); + assertThat(this.eventBus.getAllEvents()).containsOnly("one", "two", "two2", "three"); assertThat(this.eventBus.getSubscribers("one")).containsExactly("1", "3"); assertThat(this.eventBus.getSubscribers("two")).containsExactly("2"); assertThat(this.eventBus.getSubscribers("two2")).containsExactly("2"); @@ -173,8 +171,7 @@ public void testClientRegisterAndSubscribeTimeout() { this.eventBus.createSseEmitter("1", "one"); this.eventBus.createSseEmitter("2", "two", "two2"); this.eventBus.createSseEmitter("3", "one", "three"); - assertThat(this.eventBus.getAllEvents()).containsOnly("one", "two", "two2", - "three"); + assertThat(this.eventBus.getAllEvents()).containsOnly("one", "two", "two2", "three"); sleep(11, TimeUnit.SECONDS); assertThat(this.eventBus.getAllClientIds()).isEmpty(); assertThat(this.eventBus.getAllEvents()).isEmpty(); @@ -218,8 +215,7 @@ public void testClientRegisterAndSubscribeOnly() { assertThat(CLIENTS_MAP.get("1").sseEmitter()).isEqualTo(se1); assertThat(CLIENTS_MAP.get("2").sseEmitter()).isEqualTo(se2); - assertThat(this.eventBus.getAllEvents()).containsOnly("one", "three", "four", - "five"); + assertThat(this.eventBus.getAllEvents()).containsOnly("one", "three", "four", "five"); assertThat(this.eventBus.getSubscribers("one")).containsExactly("1"); assertThat(this.eventBus.getSubscribers("three")).containsExactly("2"); assertThat(this.eventBus.getSubscribers("four")).containsExactly("2"); diff --git a/src/test/java/ch/rasc/sse/eventbus/TestController.java b/src/test/java/ch/rasc/sse/eventbus/TestController.java index badc5e3..ba6dac3 100644 --- a/src/test/java/ch/rasc/sse/eventbus/TestController.java +++ b/src/test/java/ch/rasc/sse/eventbus/TestController.java @@ -36,14 +36,12 @@ public SseEmitter eventbus(@PathVariable("id") String id) { } @GetMapping("/register/{id}/{event}") - public SseEmitter eventbus(@PathVariable("id") String id, - @PathVariable("event") String event) { + public SseEmitter eventbus(@PathVariable("id") String id, @PathVariable("event") String event) { return this.eventBus.createSseEmitter(id, 30_000L, event.split(",")); } @GetMapping("/registerOnly/{id}/{event}") - public SseEmitter eventbusOnly(@PathVariable("id") String id, - @PathVariable("event") String event) { + public SseEmitter eventbusOnly(@PathVariable("id") String id, @PathVariable("event") String event) { return this.eventBus.createSseEmitter(id, 3_000L, true, event.split(",")); } @@ -55,8 +53,7 @@ public void unregister(@PathVariable("id") String id) { @ResponseBody @GetMapping("/subscribe/{id}/{event}") - public void subscribe(@PathVariable("id") String id, - @PathVariable("event") String event) { + public void subscribe(@PathVariable("id") String id, @PathVariable("event") String event) { String[] splittedEvents = event.split(","); for (String e : splittedEvents) { this.eventBus.subscribe(id, e); @@ -65,8 +62,7 @@ public void subscribe(@PathVariable("id") String id, @ResponseBody @GetMapping("/unsubscribe/{id}/{event}") - public void unsubscribe(@PathVariable("id") String id, - @PathVariable("event") String event) { + public void unsubscribe(@PathVariable("id") String id, @PathVariable("event") String event) { String[] splittedEvents = event.split(","); for (String e : splittedEvents) { this.eventBus.unsubscribe(id, e); diff --git a/src/test/java/ch/rasc/sse/eventbus/TestListener.java b/src/test/java/ch/rasc/sse/eventbus/TestListener.java index 6d7d216..cbe0c1b 100644 --- a/src/test/java/ch/rasc/sse/eventbus/TestListener.java +++ b/src/test/java/ch/rasc/sse/eventbus/TestListener.java @@ -25,9 +25,13 @@ public class TestListener implements SseEventBusListener { private List afterEventQueuedFirst = new ArrayList<>(); + private List afterEventQueued = new ArrayList<>(); + private List afterEventSentOk = new ArrayList<>(); + private List afterEventSentFail = new ArrayList<>(); + private List afterClientsUnregistered = new ArrayList<>(); @Override @@ -82,4 +86,5 @@ public void reset() { this.afterEventSentFail = new ArrayList<>(); this.afterClientsUnregistered = new ArrayList<>(); } + } diff --git a/src/test/java/ch/rasc/sse/eventbus/TestUtils.java b/src/test/java/ch/rasc/sse/eventbus/TestUtils.java index fcf7ebc..de697e3 100644 --- a/src/test/java/ch/rasc/sse/eventbus/TestUtils.java +++ b/src/test/java/ch/rasc/sse/eventbus/TestUtils.java @@ -27,4 +27,5 @@ public static void sleep(long value, TimeUnit timeUnit) { // nothing here } } + }