Skip to content

Commit

Permalink
RealtimeEventHub: code cleanup (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcom committed Dec 15, 2023
1 parent 32d900f commit 96653a7
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.suffix}</artifactId>
<artifactId>akka-actor-typed_${scala.suffix}</artifactId>
<version>${akka.version}</version>
</dependency>

Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/ru/org/linux/comment/AddCommentController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package ru.org.linux.comment

import akka.actor.ActorRef
import akka.actor.typed.ActorRef
import io.circe.generic.semiauto.*
import io.circe.syntax.*
import io.circe.{Encoder, Json}
Expand Down Expand Up @@ -47,7 +47,8 @@ import scala.jdk.OptionConverters.RichOption
class AddCommentController(ipBlockDao: IPBlockDao, commentPrepareService: CommentPrepareService,
commentService: CommentCreateService, topicPermissionService: TopicPermissionService,
topicPrepareService: TopicPrepareService, searchQueueSender: SearchQueueSender,
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef, textService: MessageTextService) {
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef[RealtimeEventHub.Protocol],
textService: MessageTextService) {

@ModelAttribute("ipBlockInfo")
def loadIPBlock(request: HttpServletRequest): IPBlockInfo = ipBlockDao.getBlockInfo(request.getRemoteAddr)
Expand Down Expand Up @@ -83,7 +84,7 @@ class AddCommentController(ipBlockDao: IPBlockDao, commentPrepareService: Commen
val tmpl = Template.getTemplate
val preparedTopic = topicPrepareService.prepareTopic(add.getTopic, currentUser.map(_.user).orNull)

if (!topicPermissionService.isCommentsAllowed(preparedTopic.group, add.getTopic, currentUser.map(_.user).asJava, false))
if (!topicPermissionService.isCommentsAllowed(preparedTopic.group, add.getTopic, currentUser.map(_.user).toJava, false))
throw new AccessViolationException("Это сообщение нельзя комментировать")

if (add.getMode == null) {
Expand Down Expand Up @@ -111,7 +112,7 @@ class AddCommentController(ipBlockDao: IPBlockDao, commentPrepareService: Commen
val comment = commentService.getComment(add, user, request)

if (add.getTopic != null) {
topicPermissionService.checkCommentsAllowed(add.getTopic, Some(user).asJava, errors)
topicPermissionService.checkCommentsAllowed(add.getTopic, Some(user).toJava, errors)
}

val tmpl = Template.getTemplate
Expand Down Expand Up @@ -165,7 +166,7 @@ class AddCommentController(ipBlockDao: IPBlockDao, commentPrepareService: Commen
val comment = commentService.getComment(add, user, request)

if (add.getTopic != null) {
topicPermissionService.checkCommentsAllowed(add.getTopic, Some(user).asJava, errors)
topicPermissionService.checkCommentsAllowed(add.getTopic, Some(user).toJava, errors)
}

if (add.isPreviewMode || errors.hasErrors || comment == null) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/ru/org/linux/reaction/ReactionService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package ru.org.linux.reaction

import akka.actor.ActorRef
import akka.actor.typed.ActorRef
import org.joda.time.DateTimeZone
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scala.transaction.support.TransactionManagement
Expand Down Expand Up @@ -111,7 +111,8 @@ object ReactionService {

@Service
class ReactionService(userService: UserService, reactionDao: ReactionDao, topicDao: TopicDao,
userEventDao: UserEventDao, @Qualifier("realtimeHubWS") realtimeHubWS: ActorRef,
userEventDao: UserEventDao,
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef[RealtimeEventHub.Protocol],
ignoreListDao: IgnoreListDao, profileDao: ProfileDao, msgbaseDao: MsgbaseDao,
textService: MessageTextService,
val transactionManager: PlatformTransactionManager) extends TransactionManagement {
Expand Down
48 changes: 30 additions & 18 deletions src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
package ru.org.linux.realtime

import akka.Done
import akka.actor.typed.Scheduler
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, PoisonPill, Props, SupervisorStrategy, Terminated, Timers}
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging
import org.springframework.beans.factory.annotation.Qualifier
Expand Down Expand Up @@ -50,10 +51,10 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {

timers.startTimerWithFixedDelay(Tick, Tick, 5.minutes)

override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

override def receive: Receive = {
case SessionStarted(session, user) if !sessions.contains(session.getId) =>
case SessionStarted(session, user, replyTo) if !sessions.contains(session.getId) =>
val actor = context.actorOf(RealtimeSessionActor.props(session))
context.watch(actor)

Expand All @@ -69,13 +70,13 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
maxDataSize = dataSize
}

sender() ! Done
case SubscribeTopic(session, topic) if sessions.contains(session.getId) =>
replyTo ! Done
case SubscribeTopic(session, topic, replyTo) if sessions.contains(session.getId) =>
val actor = sessions(session.getId)

topicSubscriptions += (topic -> actor)

sender() ! Done
replyTo ! Done
case Terminated(actorRef) =>
log.debug(s"RealtimeSessionActor $actorRef terminated")

Expand Down Expand Up @@ -116,13 +117,15 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
}

object RealtimeEventHub {
case class NewComment(msgid: Int, cid: Int)
case class RefreshEvents(users: Set[Int])
case object Tick
sealed trait Protocol

case class SessionStarted(session: WebSocketSession, user: Option[Int])
case class SubscribeTopic(session: WebSocketSession, topic: Int)
case class SessionTerminated(session: String)
case class NewComment(msgid: Int, cid: Int) extends Protocol
case class RefreshEvents(users: Set[Int]) extends Protocol
private[realtime] case object Tick extends Protocol

private[realtime] case class SessionStarted(session: WebSocketSession, user: Option[Int], replyTo: akka.actor.typed.ActorRef[Done.type]) extends Protocol
private[realtime] case class SubscribeTopic(session: WebSocketSession, topic: Int, replyTo: akka.actor.typed.ActorRef[Done.type]) extends Protocol
private[realtime] case class SessionTerminated(session: String) extends Protocol

def props: Props = Props(new RealtimeEventHub())

Expand All @@ -134,7 +137,7 @@ object RealtimeEventHub {
session.sendMessage(new TextMessage(s"events-refresh"))
}

def notifyEvents(realtimeEventHub: ActorRef, users: java.lang.Iterable[Integer]): Unit = {
def notifyEvents(realtimeEventHub: akka.actor.typed.ActorRef[RefreshEvents], users: java.lang.Iterable[Integer]): Unit = {
realtimeEventHub ! RefreshEvents(users.asScala.map(_.toInt).toSet)
}
}
Expand Down Expand Up @@ -176,12 +179,17 @@ object RealtimeSessionActor {
}

@Service
class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
topicDao: TopicDao, commentService: CommentReadService) extends TextWebSocketHandler
class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: akka.actor.typed.ActorRef[Protocol],
topicDao: TopicDao, commentService: CommentReadService,
actorSystem: ActorSystem) extends TextWebSocketHandler
with StrictLogging {

private implicit val Timeout: Timeout = 30.seconds

import akka.actor.typed.scaladsl.adapter.*

private implicit val scheduler: Scheduler = actorSystem.toTyped.scheduler

override def afterConnectionEstablished(session: WebSocketSession): Unit = {
try {
val currentUser =
Expand All @@ -191,7 +199,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,

logger.debug(s"Connected! currentUser=${currentUser.map(_.getNick)}")

val result = hub ? SessionStarted(session, currentUser.map(_.getId))
val result = hub.ask(SessionStarted(session, currentUser.map(_.getId), _))

Await.result(result, 10.seconds)
} catch {
Expand Down Expand Up @@ -231,7 +239,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
notifyComment(session, cid)
}

val result = hub ? SubscribeTopic(session, topic.id)
val result = hub.ask(SubscribeTopic(session, topic.id, _))

Await.result(result, 10.seconds)
} catch {
Expand All @@ -251,7 +259,11 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef,
@Configuration
class RealtimeConfigurationBeans(actorSystem: ActorSystem) {
@Bean(Array("realtimeHubWS"))
def hub: ActorRef = actorSystem.actorOf(RealtimeEventHub.props)
def hub: akka.actor.typed.ActorRef[Protocol] = {
import akka.actor.typed.scaladsl.adapter.*

actorSystem.actorOf(RealtimeEventHub.props)
}
}

@Configuration
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/ru/org/linux/topic/AddTopicController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package ru.org.linux.topic

import akka.actor.ActorRef
import akka.actor.typed.ActorRef
import com.google.common.base.Strings
import org.apache.commons.io.IOUtils
import org.springframework.beans.factory.annotation.Qualifier
Expand Down Expand Up @@ -102,7 +102,8 @@ class AddTopicController(searchQueueSender: SearchQueueSender, captcha: CaptchaS
tagService: TagService, userService: UserService, prepareService: TopicPrepareService,
groupPermissionService: GroupPermissionService,
addTopicRequestValidator: AddTopicRequestValidator, imageService: ImageService,
topicService: TopicService, @Qualifier("realtimeHubWS") realtimeHubWS: ActorRef,
topicService: TopicService,
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef[RealtimeEventHub.Protocol],
renderService: MarkdownFormatter, groupDao: GroupDao, dupeProtector: FloodProtector,
ipBlockDao: IPBlockDao, servletContext: ServletContext) {
@ModelAttribute("ipBlockInfo")
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/ru/org/linux/topic/EditTopicController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package ru.org.linux.topic

import akka.actor.ActorRef
import akka.actor.typed.ActorRef
import com.google.common.base.Strings
import org.apache.commons.text.StringEscapeUtils
import org.springframework.beans.factory.annotation.Qualifier
Expand Down Expand Up @@ -62,7 +62,8 @@ class EditTopicController(messageDao: TopicDao, searchQueueSender: SearchQueueSe
permissionService: GroupPermissionService, captcha: CaptchaService, msgbaseDao: MsgbaseDao,
editHistoryService: EditHistoryService, imageService: ImageService,
editTopicRequestValidator: EditTopicRequestValidator, ipBlockDao: IPBlockDao,
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef, tagService: TagService) {
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef[RealtimeEventHub.Protocol],
tagService: TagService) {
@RequestMapping(value = Array("/commit.jsp"), method = Array(RequestMethod.GET))
def showCommitForm(@RequestParam("msgid") msgid: Int, @ModelAttribute("form") form: EditTopicRequest): ModelAndView = AuthorizedOnly { currentUser =>
val tmpl = Template.getTemplate
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/ru/org/linux/user/UserEventApiController.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 1998-2022 Linux.org.ru
* Copyright 1998-2023 Linux.org.ru
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -14,10 +14,11 @@
*/
package ru.org.linux.user

import akka.actor.ActorRef
import akka.actor.typed.ActorRef
import com.google.common.collect.ImmutableList
import io.circe.Json
import io.circe.syntax.*
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.{RequestMapping, RequestMethod, RequestParam, ResponseBody}
import ru.org.linux.auth.AuthUtil
Expand All @@ -27,7 +28,8 @@ import ru.org.linux.realtime.RealtimeEventHub
import javax.servlet.http.HttpServletResponse

@Controller
class UserEventApiController(userEventService: UserEventService, realtimeHubWS: ActorRef) {
class UserEventApiController(userEventService: UserEventService,
@Qualifier("realtimeHubWS") realtimeHubWS: ActorRef[RealtimeEventHub.Protocol]) {
@ResponseBody
@RequestMapping(value = Array("/notifications-count"), method = Array(RequestMethod.GET))
def getEventsCount(response: HttpServletResponse): Json = AuthorizedOnly { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package ru.org.linux.topic;

import akka.actor.ActorRef;
import akka.actor.typed.ActorRef;
import com.sksamuel.elastic4s.ElasticClient;
import org.springframework.context.annotation.*;
import org.springframework.stereotype.Controller;
Expand All @@ -28,6 +28,7 @@
import ru.org.linux.exception.ExceptionResolver;
import ru.org.linux.group.GroupDao;
import ru.org.linux.markup.MessageTextService;
import ru.org.linux.realtime.RealtimeEventHub;
import ru.org.linux.realtime.RealtimeWebsocketHandler;
import ru.org.linux.search.MoreLikeThisService;
import ru.org.linux.search.SearchQueueListener;
Expand Down Expand Up @@ -111,7 +112,7 @@ public ElasticClient elasticClient() {
}

@Bean("realtimeHubWS")
public ActorRef realtimeHub() {
public ActorRef<RealtimeEventHub.Protocol> realtimeHub() {
return mock(ActorRef.class);
}
}

0 comments on commit 96653a7

Please sign in to comment.