Skip to content

Commit

Permalink
예약 도메인 캐시 패턴 및 eventBroker 적용 (#20)
Browse files Browse the repository at this point in the history
* Refactor: 포인트 컨슈머 스레드 명칭을 좀 더 간결하게 변경

* Refactor: 포인트 캐시 업데이트 함수도 결국엔 도메인 비즈니스 로직으로 해당 계층에 맞게 네이밍 변경

* Refactor: 비동기 스레드 리소스 문제로 인한 오류 발생 해결을 위해 Async 코드 삭제 및 예약 요청시 응답시간을 늘리기위해 좌석 상태 변경을 cache write-through 패턴으로 변경

* Refactor: 응답시간 개선을 위해 예약을 위해 콘서트 조회 부분에 cache-aside 패턴 적용

* Feat: DB 작업 병목으로 인한 api 응답속도 개선을 위해 예약 저장 및 좌석 상태 변경 관련 DB 작업은 EventBroker에게 위임

* Test: 변경된 예약 요청에 맞게 테스트코드 수정

* Refactor: 아키텍처 설계의도에 맞게 service 인터페이스 함수 리팩토링 이후 사용하지 않는 함수 삭제

* Refactor: 포인트 결제와 pg 결제 승인(mock) 분리이후 사용하지 않는 함수 삭제

* Refactor: 예약 도메인 리팩토링 과정에서 사용하지 않게된 dto 클래스 삭제

* Test: 리팩토링 이후 테스트코드 정리
  • Loading branch information
wanniDev authored Jun 6, 2024
1 parent d92491f commit 4988a13
Show file tree
Hide file tree
Showing 23 changed files with 244 additions and 144 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package io.ticketaka.api.common.infrastructure.event

import io.ticketaka.api.common.domain.DomainEvent
import io.ticketaka.api.concert.infrastructure.event.ReservationCreateEventConsumer
import io.ticketaka.api.point.domain.PointChargeEvent
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.infrastructure.event.PointChargeEventConsumer
import io.ticketaka.api.point.infrastructure.event.PointRechargeEventConsumer
import io.ticketaka.api.reservation.domain.reservation.ReservationCreateEvent
import org.springframework.stereotype.Component

@Component
class EventDispatcher(
private val pointRechargeEventConsumer: PointRechargeEventConsumer,
private val pointChargeEventConsumer: PointChargeEventConsumer,
private val reservationCreateEventConsumer: ReservationCreateEventConsumer,
) {
fun dispatch(event: DomainEvent) {
when (event) {
Expand All @@ -20,6 +23,9 @@ class EventDispatcher(
is PointChargeEvent -> {
pointChargeEventConsumer.offer(event)
}
is ReservationCreateEvent -> {
reservationCreateEventConsumer.offer(event)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,64 +1,20 @@
package io.ticketaka.api.concert.application

import io.ticketaka.api.common.exception.BadClientRequestException
import io.ticketaka.api.concert.application.dto.SeatResult
import io.ticketaka.api.concert.domain.Concert
import io.ticketaka.api.concert.domain.ConcertRepository
import io.ticketaka.api.concert.domain.Seat
import io.ticketaka.api.concert.domain.SeatRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDate

@Service
class ConcertSeatService(
private val concertCacheAsideQueryService: ConcertCacheAsideQueryService,
private val seatRepository: SeatRepository,
private val concertRepository: ConcertRepository,
) {
fun getSeatNumbers(date: LocalDate): List<SeatResult> {
val concert = concertCacheAsideQueryService.getConcert(date)
return concertCacheAsideQueryService.getConcertSeatNumbers(concert.id).map { SeatResult(it.number, it.status) }
}

@Transactional(readOnly = true)
fun getAvailableConcert(date: LocalDate): Concert {
val concert =
concertRepository.findByDate(date)
?: throw BadClientRequestException("해당 날짜의 콘서트가 없습니다.")
return concert
}

@Transactional(readOnly = true)
fun getAvailableSeats(
date: LocalDate,
seatNumbers: List<String>,
): Set<Seat> {
val concert =
concertRepository.findByDate(date)
?: throw BadClientRequestException("해당 날짜의 콘서트가 없습니다.")
val seats = seatRepository.findSeatsByConcertDateAndNumberInOrderByNumber(concert.date, seatNumbers)
seats.forEach { seat ->
if (seat.status != Seat.Status.AVAILABLE) {
throw BadClientRequestException("이미 예약된 좌석입니다.")
}
}
return seats
}

@Transactional
fun reserveSeat(
concertId: Long,
seatNumbers: List<String>,
): Set<Seat> {
val seats =
seatRepository.findSeatsByConcertIdAndNumberInOrderByNumberForUpdate(concertId, seatNumbers)
seats.forEach { seat ->
seat.reserve()
}
return seats
}

fun getDates(): List<LocalDate> {
return concertRepository.findAllDate().sorted()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.ticketaka.api.concert.domain

import java.time.LocalDate

interface ConcertSeatUpdater {
fun reserve(
concertId: Long,
date: LocalDate,
seatNumbers: List<String>,
): Set<Seat>

fun confirm(
concertId: Long,
date: String,
seatNumbers: List<String>,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ interface SeatRepository {
numbers: List<String>,
): Set<Seat>

fun findByIdsOrderByNumberForUpdate(ids: List<Long>): Set<Seat>

fun findSeatsByConcertDateAndNumberInOrderByNumber(
date: LocalDate,
numbers: List<String>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.ticketaka.api.concert.infrastructure

import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.concert.domain.ConcertSeatUpdater
import io.ticketaka.api.concert.domain.Seat
import io.ticketaka.api.concert.domain.SeatRepository
import org.springframework.cache.caffeine.CaffeineCacheManager
import org.springframework.stereotype.Component
import java.time.LocalDate

@Component
class InMemoryCacheConcertSeatUpdater(
private val caffeineCacheManager: CaffeineCacheManager,
private val seatRepository: SeatRepository,
) : ConcertSeatUpdater {
override fun reserve(
concertId: Long,
date: LocalDate,
seatNumbers: List<String>,
): Set<Seat> {
val cache = caffeineCacheManager.getCache("seatNumbers") ?: throw NotFoundException("콘서트별 좌석 캐시가 존재하지 않습니다.")
synchronized(cache) {
var seats = cache.get(concertId) { setOf<Seat>() } as Set<Seat>
if (seats.isEmpty()) {
seats = seatRepository.findByConcertId(concertId)
}
seats.sortedBy { it.number }
.filter { seatNumbers.contains(it.number) }
.forEach { it.reserve() }
cache.put(concertId, seats)
return seats.filter { seatNumbers.contains(it.number) }.toSet()
}
}

override fun confirm(
concertId: Long,
date: String,
seatNumbers: List<String>,
) {
println("Confirm concert seat")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.ticketaka.api.concert.infrastructure.event

import io.ticketaka.api.concert.domain.SeatRepository
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

@Component
class DBSeatStatusManger(
private val seatRepository: SeatRepository,
) {
@Transactional
fun reserve(seatIds: List<Long>) {
val seats = seatRepository.findByIdsOrderByNumberForUpdate(seatIds)
seats.forEach { it.reserve() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.ticketaka.api.concert.infrastructure.event

import io.ticketaka.api.reservation.domain.reservation.Reservation
import io.ticketaka.api.reservation.domain.reservation.ReservationCreateEvent
import io.ticketaka.api.reservation.domain.reservation.ReservationRepository
import io.ticketaka.api.reservation.domain.reservation.ReservationSeatAllocator
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.util.StopWatch
import java.util.concurrent.LinkedBlockingDeque
import kotlin.concurrent.thread

@Component
class ReservationCreateEventConsumer(
private val reservationRepository: ReservationRepository,
private val reservationSeatAllocator: ReservationSeatAllocator,
private val seatStatusManger: DBSeatStatusManger,
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val eventQueue = LinkedBlockingDeque<ReservationCreateEvent>()

init {
startEventConsumer()
}

fun consume(events: MutableList<ReservationCreateEvent>) {
events.forEach { event ->
seatStatusManger.reserve(event.seatIds)
val reservation =
reservationRepository.save(
Reservation.createPendingReservation(
userId = event.userId,
concertId = event.concertId,
),
)
reservationSeatAllocator.allocate(
reservationId = reservation.id,
seatIds = event.seatIds,
)
}
}

fun offer(event: ReservationCreateEvent) {
eventQueue.add(event)
}

private fun startEventConsumer() {
thread(
start = true,
isDaemon = true,
name = "reservationEventConsumer",
) {
while (true) {
val stopWatch = StopWatch()
stopWatch.start()
var processingTime = 1000L
val currentThread = Thread.currentThread()
while (currentThread.state.name == Thread.State.WAITING.name) {
logger.info(currentThread.state.name)
Thread.sleep(processingTime)
}
if (eventQueue.isNotEmpty()) {
val events = mutableListOf<ReservationCreateEvent>()
var quantity = 1000
while (eventQueue.isNotEmpty().and(quantity > 0)) {
quantity--
eventQueue.poll()?.let { events.add(it) }
}
consume(events)
stopWatch.stop()
processingTime = stopWatch.totalTimeMillis
logger.debug("reservationEventConsumer consume ${events.size} events, cost ${processingTime}ms")
} else {
Thread.sleep(5000)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ interface JpaSeatRepository : JpaRepository<Seat, Long> {
seatNumbers: List<String>,
): List<Seat>

@Lock(LockModeType.PESSIMISTIC_WRITE)
fun findByIdInOrderByNumber(ids: List<Long>): List<Seat>

@Lock(LockModeType.PESSIMISTIC_WRITE)
fun findSeatsByConcertDateAndNumberInOrderByNumber(
date: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class SeatRepositoryComposition(
return jpaSeatRepository.findSeatsByConcertDateAndNumberIn(date, numbers).toSet()
}

override fun findByIdsOrderByNumberForUpdate(ids: List<Long>): Set<Seat> {
return jpaSeatRepository.findByIdInOrderByNumber(ids).toSet()
}

override fun findSeatsByConcertDateAndNumberInOrderByNumber(
date: LocalDate,
numbers: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,17 @@ package io.ticketaka.api.point.application
import io.ticketaka.api.point.application.dto.PaymentCommand
import io.ticketaka.api.point.domain.payment.Payment
import io.ticketaka.api.point.domain.payment.PaymentRepository
import org.springframework.context.ApplicationEventPublisher
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional

@Service
@Transactional(readOnly = true)
class PaymentService(
private val paymentRepository: PaymentRepository,
private val pointCacheAsideQueryService: PointCacheAsideQueryService,
private val applicationEventPublisher: ApplicationEventPublisher,
) {
@Transactional
fun paymentApproval(paymentCommand: PaymentCommand) {
Thread.sleep((500..1000).random().toLong()) // PG 승인 요청 시간 대기
paymentRepository.save(Payment.newInstance(paymentCommand.amount, paymentCommand.userId, paymentCommand.pointId))
}

@Async
@Transactional(propagation = Propagation.NESTED)
fun paymentApprovalAsync(paymentCommand: PaymentCommand) {
try {
Thread.sleep((500..1000).random().toLong()) // PG 승인 요청 시간 대기
val payment = Payment.newInstance(paymentCommand.amount, paymentCommand.userId, paymentCommand.pointId)
paymentRepository.save(payment)
payment.pollAllEvents().forEach { applicationEventPublisher.publishEvent(it) }
} catch (e: Exception) {
e.printStackTrace()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.ticketaka.api.point.application
import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.point.application.dto.BalanceQueryModel
import io.ticketaka.api.point.application.dto.RechargeCommand
import io.ticketaka.api.point.domain.PointBalanceCacheUpdater
import io.ticketaka.api.point.domain.PointBalanceUpdater
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.domain.PointRepository
import io.ticketaka.api.user.application.TokenUserCacheAsideQueryService
Expand All @@ -15,15 +15,15 @@ import org.springframework.transaction.annotation.Transactional
class PointService(
private val tokenUserCacheAsideQueryService: TokenUserCacheAsideQueryService,
private val pointCacheAsideQueryService: PointCacheAsideQueryService,
private val pointBalanceCacheUpdater: PointBalanceCacheUpdater,
private val pointBalanceUpdater: PointBalanceUpdater,
private val applicationEventPublisher: ApplicationEventPublisher,
private val pointRepository: PointRepository,
) {
@Transactional
fun recharge(rechargeCommand: RechargeCommand) {
val user = tokenUserCacheAsideQueryService.getUser(rechargeCommand.userId)
val point = pointCacheAsideQueryService.getPoint(user.pointId)
pointBalanceCacheUpdater.recharge(point.id, rechargeCommand.amount)
pointBalanceUpdater.recharge(point.id, rechargeCommand.amount)
applicationEventPublisher.publishEvent(PointRechargeEvent(user.id, point.id, rechargeCommand.amount))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.ticketaka.api.point.domain

import java.math.BigDecimal

interface PointBalanceCacheUpdater {
interface PointBalanceUpdater {
fun recharge(
pointId: Long,
amount: BigDecimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package io.ticketaka.api.point.infrastructure

import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.point.domain.Point
import io.ticketaka.api.point.domain.PointBalanceCacheUpdater
import io.ticketaka.api.point.domain.PointBalanceUpdater
import org.springframework.cache.caffeine.CaffeineCacheManager
import org.springframework.stereotype.Component
import java.math.BigDecimal

@Component
class InMemoryPointBalanceCacheUpdater(
class InMemoryCachePointBalanceUpdater(
private val caffeineCacheManager: CaffeineCacheManager,
) : PointBalanceCacheUpdater {
) : PointBalanceUpdater {
override fun recharge(
pointId: Long,
amount: BigDecimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.ticketaka.api.point.application.PointService
import io.ticketaka.api.point.domain.PointHistory
import io.ticketaka.api.point.domain.PointHistoryRepository
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.domain.PointRepository
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.util.StopWatch
Expand All @@ -13,7 +12,6 @@ import kotlin.concurrent.thread

@Component
class PointRechargeEventConsumer(
private val pointRepository: PointRepository,
private val pointHistoryRepository: PointHistoryRepository,
private val pointService: PointService,
) {
Expand Down Expand Up @@ -49,7 +47,7 @@ class PointRechargeEventConsumer(
thread(
start = true,
isDaemon = true,
name = "PointRechargeEventConsumer",
name = "pointEventConsumer",
) {
while (true) {
val stopWatch = StopWatch()
Expand Down
Loading

0 comments on commit 4988a13

Please sign in to comment.