Skip to content

Commit

Permalink
Merge pull request #16426 from lichess-org/relay-finishedAt
Browse files Browse the repository at this point in the history
Relay finished at
  • Loading branch information
ornicar authored Nov 17, 2024
2 parents dac363a + 94c9608 commit e2fb7da
Show file tree
Hide file tree
Showing 15 changed files with 59 additions and 56 deletions.
8 changes: 2 additions & 6 deletions app/controllers/RelayRound.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,7 @@ final class RelayRound(
}(Unauthorized, Forbidden)

def stats(id: RelayRoundId) = Open:
env.relay.stats
.get(id)
.map: stats =>
import lila.relay.JsonView.given
JsonOk(stats)
env.relay.statsJson(id).map(JsonOk)

private def WithRoundAndTour(
@nowarn ts: String,
Expand Down Expand Up @@ -274,7 +270,7 @@ final class RelayRound(
case VideoEmbed.Auto =>
fuccess:
rt.tour.pinnedStream
.ifFalse(rt.round.finished)
.ifFalse(rt.round.isFinished)
.flatMap(_.upstream)
.map(_.urls(netDomain).toPair)
case VideoEmbed.No => fuccess(none)
Expand Down
6 changes: 6 additions & 0 deletions bin/mongodb/relay-round-finishedAt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
db.relay.find({ finished: true, finishedAt: { $exists: false } }).forEach(function(relay) {
const startAt = relay.startedAt || relay.startsAt || relay.createdAt;
const duration = 1000 * 60 * 60 * 3; // 3 hours
const finishAt = new Date(startAt.getTime() + duration);
db.relay.updateOne({ _id: relay._id }, { $set: { finishedAt: finishAt } });
});
1 change: 1 addition & 0 deletions modules/common/src/main/mon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ object mon:
private def relay(official: Boolean, id: RelayTourId, slug: String) =
tags("by" -> by(official), "slug" -> s"$slug/$id")
def ongoing(official: Boolean) = gauge("relay.ongoing").withTag("by", by(official))
val crowdMonitor = gauge("relay.crowdMonitor").withoutTags()
def games(official: Boolean, id: RelayTourId, slug: String) =
gauge("relay.games").withTags(relay(official, id, slug))
def moves(official: Boolean, id: RelayTourId, slug: String) =
Expand Down
6 changes: 6 additions & 0 deletions modules/db/src/main/dsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ trait dsl:
"$unset" -> fields.nonEmpty.so($doc(fields.map(k => (k, BSONString("")))))
def $unset(field: String, fields: String*): Bdoc = $doc:
"$unset" -> $doc((Seq(field) ++ fields).map(k => (k, BSONString(""))))

def $unsetCompute[A](prev: A, next: A, fields: (String, A => Option[?])*): Bdoc =
$unset:
fields.flatMap: (key, accessor) =>
(accessor(prev).isDefined && accessor(next).isEmpty).option(key)

def $setBoolOrUnset(field: String, value: Boolean): Bdoc =
if value then $set(field -> true) else $unset(field)
def $setsAndUnsets(items: (String, Option[BSONValue])*): Bdoc =
Expand Down
6 changes: 4 additions & 2 deletions modules/relay/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ final class Env(

lazy val listing: RelayListing = wire[RelayListing]

lazy val stats = wire[RelayStatsApi]

lazy val api: RelayApi = wire[RelayApi]

lazy val tourStream: RelayTourStream = wire[RelayTourStream]
Expand Down Expand Up @@ -101,6 +99,10 @@ final class Env(

private lazy val delay = wire[RelayDelay]

// eager init to start the scheduler
private val stats = wire[RelayStatsApi]
export stats.{ getJson as statsJson }

import SettingStore.CredentialsOption.given
val proxyCredentials = settingStore[Option[Credentials]](
"relayProxyCredentials",
Expand Down
9 changes: 5 additions & 4 deletions modules/relay/src/main/JsonView.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,15 @@ object JsonView:
"slug" -> r.slug,
"createdAt" -> r.createdAt
)
.add("finished" -> r.finished)
.add("ongoing" -> (r.hasStarted && !r.finished))
.add("finishedAt" -> r.finishedAt)
.add("finished" -> r.isFinished) // BC
.add("ongoing" -> (r.hasStarted && !r.isFinished))
.add("startsAt" -> r.startsAtTime.orElse(r.startedAt))
.add("startsAfterPrevious" -> r.startsAfterPrevious)

given OWrites[RelayStats.RoundStats] = OWrites: r =>
def statsJson(stats: RelayStats.RoundStats) =
Json.obj(
"viewers" -> r.viewers.map: (minute, crowd) =>
"viewers" -> stats.viewers.map: (minute, crowd) =>
Json.arr(minute * 60, crowd)
)

Expand Down
6 changes: 3 additions & 3 deletions modules/relay/src/main/RelayApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ final class RelayApi(
round <- copyRoundSourceSettings(updated)
_ <- (from.name != round.name).so(studyApi.rename(round.studyId, round.name.into(StudyName)))
setters <- tryBdoc(round).toEither.toFuture
unsetters = (from.caption.isDefined && updated.caption.isEmpty).option("caption").toList
_ <- roundRepo.coll.update.one($id(round.id), $set(setters) ++ $unset(unsetters)).void
unsets = $unsetCompute(from, updated, ("caption", _.caption), ("finishedAt", _.finishedAt))
_ <- roundRepo.coll.update.one($id(round.id), $set(setters) ++ unsets).void
_ <- (round.sync.playing != from.sync.playing)
.so(sendToContributors(round.id, "relaySync", jsonView.sync(round)))
_ <- denormalizeTour(round.tourId)
nextRoundToStart <- round.finished.so(nextRoundThatStartsAfterThisOneCompletes(round))
nextRoundToStart <- round.isFinished.so(nextRoundThatStartsAfterThisOneCompletes(round))
_ <- nextRoundToStart.so(next => requestPlay(next.id, v = true))
yield
round.sync.log.events.lastOption
Expand Down
6 changes: 3 additions & 3 deletions modules/relay/src/main/RelayFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ final private class RelayFetch(
nbGamesFinished > nbGamesUnstarted
noMoreGamesSelected = games.isEmpty && allGamesInSource.nonEmpty
autoFinishNow = rt.round.hasStarted && (allGamesFinishedOrUnstarted || noMoreGamesSelected)
roundUpdate = updating:
_.withSync(_.addLog(SyncLog.event(res.nbMoves, none)))
.copy(finished = autoFinishNow)
roundUpdate = updating: r =>
r.withSync(_.addLog(SyncLog.event(res.nbMoves, none)))
.copy(finishedAt = r.finishedAt.orElse(autoFinishNow.option(nowInstant)))
yield res -> roundUpdate
syncFu
.recover:
Expand Down
4 changes: 2 additions & 2 deletions modules/relay/src/main/RelayListing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ final class RelayListing(
yield
spotlightCache = active
.filter(_.tour.spotlight.exists(_.enabled))
.filterNot(_.display.finished)
.filterNot(_.display.isFinished)
.filter: tr =>
tr.display.hasStarted || tr.display.startsAtTime.exists(_.isBefore(nowInstant.plusMinutes(30)))
active
Expand Down Expand Up @@ -256,7 +256,7 @@ private object RelayListing:
.match
case None => trs.rounds.headOption
case Some(last) =>
trs.rounds.find(!_.finished) match
trs.rounds.find(!_.isFinished) match
case None => last.some
case Some(next) =>
if next.startsAtTime.exists(_.isBefore(nowInstant.plusHours(1)))
Expand Down
9 changes: 4 additions & 5 deletions modules/relay/src/main/RelayPush.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import lila.study.{ ChapterPreviewApi, MultiPgn, StudyPgnImport }
final class RelayPush(
sync: RelaySync,
api: RelayApi,
stats: RelayStatsApi,
chapterPreview: ChapterPreviewApi,
fidePlayers: RelayFidePlayerApi,
playerEnrich: RelayPlayerEnrich,
Expand Down Expand Up @@ -61,13 +60,13 @@ final class RelayPush(
case e: Exception => SyncLog.event(0, e.some)
_ = if !rt.round.hasStarted && !rt.tour.official && event.hasMoves then
irc.broadcastStart(rt.round.id, rt.fullName)
_ = stats.setActive(rt.round.id)
allGamesFinished <- (games.nonEmpty && games.forall(_.points.isDefined)).so:
chapterPreview.dataList(rt.round.studyId).map(_.forall(_.finished))
round <- api.update(rt.round): r1 =>
val r2 = r1.withSync(_.addLog(event))
val r3 = if event.hasMoves then r2.ensureStarted.resume(rt.tour.official) else r2
r3.copy(finished = allGamesFinished)
val r2 = r1.withSync(_.addLog(event))
val r3 = if event.hasMoves then r2.ensureStarted.resume(rt.tour.official) else r2
val finishedAt = allGamesFinished.option(r3.finishedAt.|(nowInstant))
r3.copy(finishedAt = finishedAt)
_ <- andSyncTargets.so(api.syncTargetsOfSource(round))
yield ()

Expand Down
9 changes: 6 additions & 3 deletions modules/relay/src/main/RelayRound.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,33 @@ case class RelayRound(
startedAt: Option[Instant],
/* at least it *looks* finished... but maybe it's not
* sync.nextAt is used for actually synchronising */
finished: Boolean,
finishedAt: Option[Instant],
createdAt: Instant,
crowd: Option[Int]
// crowdAt: Option[Instant], // in DB but not used by RelayRound
):
inline def studyId = id.into(StudyId)

lazy val slug =
val s = scalalib.StringOps.slug(name.value)
if s.isEmpty then "-" else s

def isFinished = finishedAt.isDefined

def startsAtTime = startsAt.flatMap:
case RelayRound.Starts.At(at) => at.some
case _ => none
def startsAfterPrevious = startsAt.contains(RelayRound.Starts.AfterPrevious)

def finish =
copy(
finished = true,
finishedAt = finishedAt.orElse(nowInstant.some),
sync = sync.pause
)

def resume(official: Boolean) =
copy(
finished = false,
finishedAt = none,
sync = sync.play(official)
)

Expand Down
6 changes: 3 additions & 3 deletions modules/relay/src/main/RelayRoundForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ object RelayRoundForm:
caption = if Granter(_.StudyAdmin) then caption else relay.caption,
sync = if relay.sync.playing then sync.play(official) else sync,
startsAt = relayStartsAt,
finished = ~finished
finishedAt = finished.orZero.option(relay.finishedAt.|(nowInstant))
)

private def makeSync(prev: Option[RelayRound.Sync])(using Me): Sync =
Expand All @@ -241,7 +241,7 @@ object RelayRoundForm:
sync = makeSync(none),
createdAt = nowInstant,
crowd = none,
finished = ~finished,
finishedAt = (~finished).option(nowInstant),
startsAt = relayStartsAt,
startedAt = none
)
Expand All @@ -267,7 +267,7 @@ object RelayRoundForm:
case ids: Upstream.Ids => ids,
startsAt = relay.startsAtTime,
startsAfterPrevious = relay.startsAfterPrevious.option(true),
finished = relay.finished.option(true),
finished = relay.isFinished.option(true),
period = relay.sync.period,
onlyRound = relay.sync.onlyRound,
slices = relay.sync.slices,
Expand Down
27 changes: 9 additions & 18 deletions modules/relay/src/main/RelayStatsApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import scalalib.cache.ExpireSetMemo

import lila.db.dsl.{ *, given }

object RelayStats:
private object RelayStats:
type Minute = Int
type Crowd = Int
type Graph = List[(Minute, Crowd)]
case class RoundStats(viewers: Graph)

final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using
private final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using
Executor
):
import RelayStats.*
Expand All @@ -28,10 +28,7 @@ final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using
.toList
.map(RoundStats.apply)

def setActive(id: RelayRoundId) = activeRounds.put(id)

// keep monitoring rounds for some time after they stopped syncing
private val activeRounds = ExpireSetMemo[RelayRoundId](2 hours)
def getJson(id: RelayRoundId) = get(id).map(JsonView.statsJson)

private def record(): Funit = for
crowds <- fetchRoundCrowds
Expand Down Expand Up @@ -63,26 +60,20 @@ final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using
yield ()

private def fetchRoundCrowds: Fu[List[(RelayRoundId, Crowd)]] =
val max = 500
val max = 200
colls.round
.aggregateList(maxDocs = max, _.sec): framework =>
import framework.*
Match(
$doc(
$or(
$doc("sync.until" -> $exists(true)),
$inIds(activeRounds.keys)
),
"crowd".$gt(0)
)
) ->
List(Project($doc("_id" -> 1, "crowd" -> 1, "syncing" -> "$sync.until")))
// lila-ws sets crowdAt along with crowd
// so we can use crowdAt to know which rounds are being monitored
Match($doc("crowdAt".$gt(nowInstant.minusMinutes(1)))) ->
List(Project($doc("_id" -> 1, "crowd" -> 1)))
.map: docs =>
lila.mon.relay.crowdMonitor.update(docs.size)
if docs.size == max
then logger.warn(s"RelayStats.fetchRoundCrowds: $max docs fetched")
for
doc <- docs
id <- doc.getAsOpt[RelayRoundId]("_id")
crowd <- doc.getAsOpt[Crowd]("crowd")
_ = if doc.contains("syncing") then activeRounds.put(id)
yield (id, crowd)
2 changes: 1 addition & 1 deletion modules/relay/src/main/ui/RelayFormUi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class RelayFormUi(helpers: Helpers, ui: RelayUi, tourUi: RelayTourUi):
href := routes.RelayRound.edit(r.id),
cls := List("subnav__subitem text" -> true, "active" -> nav.roundId.has(r.id)),
dataIcon := (
if r.finished then Icon.Checkmark
if r.isFinished then Icon.Checkmark
else if r.hasStarted then Icon.DiscBig
else Icon.DiscOutline
)
Expand Down
10 changes: 4 additions & 6 deletions modules/tournament/src/main/TournamentStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ final class TournamentStatsApi(

private given BSONDocumentHandler[TournamentStats] = Macros.handler

private val cache = mongoCache[TourId, TournamentStats](64, "tournament:stats", 60 days, _.value) {
loader =>
_.expireAfterAccess(10 minutes)
.maximumSize(256)
.buildAsyncFuture(loader(fetch))
}
private val cache = mongoCache[TourId, TournamentStats](64, "tournament:stats", 60 days, _.value): loader =>
_.expireAfterAccess(10 minutes)
.maximumSize(256)
.buildAsyncFuture(loader(fetch))

private def fetch(tournamentId: TourId): Fu[TournamentStats] =
for
Expand Down

0 comments on commit e2fb7da

Please sign in to comment.