Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid concurrent writes to jena #1577

Merged
merged 23 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions acceptance-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ services {
projects-tokens {
db-port = 49998
}
triples-generator-db {
db-port = 49997
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,56 @@

package io.renku.graph.acceptancetests.db

import cats.effect.{IO, Resource, Temporal}
import cats.{Applicative, Monad}
import cats.effect.{IO, Temporal}
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import eu.timepit.refined.auto._
import io.renku.db.{DBConfigProvider, PostgresContainer}
import io.renku.triplesgenerator.TgLockDB.SessionResource
import io.renku.triplesgenerator.{TgLockDB, TgLockDbConfigProvider}
import io.renku.triplesstore._
import natchez.Trace.Implicits.noop
import org.typelevel.log4cats.Logger
import skunk.Session

import scala.concurrent.duration._
import scala.util.Try

object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDataset {

protected override val jenaRunMode: JenaRunMode = JenaRunMode.FixedPortContainer(3030)

private lazy val dbConfig: DBConfigProvider.DBConfig[TgLockDB] =
eikek marked this conversation as resolved.
Show resolved Hide resolved
new TgLockDbConfigProvider[Try].get().fold(throw _, identity)

private lazy val postgresContainer = FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = Map(
"POSTGRES_USER" -> dbConfig.user.value,
"POSTGRES_PASSWORD" -> dbConfig.pass.value,
"POSTGRES_DB" -> dbConfig.name.value
),
exposedPorts = Seq(dbConfig.port.value),
exposedHostPort = dbConfig.port.value,
exposedContainerPort = dbConfig.port.value,
command = Seq(s"-p ${dbConfig.port.value}")
)

lazy val sessionResource: Resource[IO, SessionResource[IO]] =
Session
.pooled[IO](
host = postgresContainer.host,
port = dbConfig.port.value,
database = dbConfig.name.value,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
max = dbConfig.connectionPool.value
)
.map(new SessionResource(_))

def start()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(isRunning)(IO(container.start()))
_ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start()))
_ <- waitForReadiness
_ <- logger.info("Triples Store started")
} yield ()
Expand All @@ -40,4 +76,5 @@ object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDat

private def waitForReadiness(implicit logger: Logger[IO]): IO[Unit] =
Monad[IO].whileM_(IO(!isRunning))(logger.info("Waiting for TS") >> (Temporal[IO] sleep (500 millis)))

}
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ lazy val triplesGenerator = project
.in(file("triples-generator"))
.withId("triples-generator")
.settings(commonSettings)
.settings(
reStart / envVars := Map(
"JENA_RENKU_PASSWORD" -> "renku",
"JENA_ADMIN_PASSWORD" -> "renku",
"RENKU_URL" -> "http://localhost:3000"
)
)
.dependsOn(
triplesGeneratorApi % "compile->compile; test->test",
entitiesSearch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class SessionResource[F[_]: MonadCancelThrow, TargetDB](resource: Resource[F, Se
): F[ResultType] = resource.use { session =>
session.transaction.use(transaction => query.run((transaction, session)))
}

val session: Resource[F, Session[F]] = resource
}

object SessionPoolResource {
Expand Down
76 changes: 76 additions & 0 deletions graph-commons/src/main/scala/io/renku/lock/Lock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.lock

import cats.Functor
import cats.data.Kleisli
import cats.effect._
import cats.effect.std.Mutex
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

object Lock {

def none[F[_], A]: Lock[F, A] =
Kleisli(_ => Resource.unit)

def global[F[_]: Async, A]: F[Lock[F, A]] =
Mutex[F].map(mutex => Kleisli(_ => mutex.lock))

def memory[F[_]: Async, A]: F[Lock[F, A]] =
memory0[F, A].map(_._2)

private[lock] def memory0[F[_]: Async, A]: F[(Ref[F, Map[A, Mutex[F]]], Lock[F, A])] =
Ref.of[F, Map[A, Mutex[F]]](Map.empty[A, Mutex[F]]).map { data =>
def set(k: A) =
Resource.eval(Mutex[F]).flatMap { newMutex =>
val next =
Resource.eval(
data.updateAndGet(m =>
m.updatedWith(k) {
case r @ Some(_) => r
case None => Some(newMutex)
}
)
)

next.flatMap(_(k).lock.onFinalize(data.update(_.removed(k))))
}

(data, Kleisli(set))
}

def create[F[_]: Temporal, A](acquire: Kleisli[F, A, Boolean], interval: FiniteDuration)(
release: Kleisli[F, A, Unit]
): Lock[F, A] = {
val acq: Kleisli[F, A, Unit] = Kleisli { key =>
(Stream.eval(acquire.run(key)) ++ Stream.sleep(interval).drain).repeat
.find(_ == true)
.compile
.drain
}

from(acq)(release)
}

def from[F[_]: Functor, A](acquire: Kleisli[F, A, Unit])(release: Kleisli[F, A, Unit]): Lock[F, A] =
Kleisli(key => Resource.make(acquire.run(key))(_ => release.run(key)))
}
53 changes: 53 additions & 0 deletions graph-commons/src/main/scala/io/renku/lock/LongKey.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.lock

import io.renku.graph.model.entities.Project
import io.renku.graph.model.projects

import scala.util.hashing.MurmurHash3

trait LongKey[A] { self =>
def asLong(a: A): Long

def contramap[B](f: B => A): LongKey[B] =
LongKey.create(b => self.asLong(f(b)))
}

object LongKey {
def apply[A](implicit lk: LongKey[A]): LongKey[A] = lk

def create[A](f: A => Long): LongKey[A] =
(a: A) => f(a)

implicit val forLong: LongKey[Long] =
create(identity)

implicit val forInt: LongKey[Int] =
create(_.toLong)

implicit val forString: LongKey[String] =
forInt.contramap(MurmurHash3.stringHash)

implicit val forProjectPath: LongKey[projects.Path] =
forString.contramap(_.value)

implicit val forProject: LongKey[Project] =
forProjectPath.contramap(_.path)
}
121 changes: 121 additions & 0 deletions graph-commons/src/main/scala/io/renku/lock/PostgresLock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.lock

import cats.Applicative
import cats.data.Kleisli
import cats.effect._
import cats.syntax.all._
import org.typelevel.log4cats.Logger
import skunk._
import skunk.codec.all._
import skunk.implicits._

import scala.concurrent.duration._

/** Uses postgres' per session advisory locks. It requires the key to be of type bigint, so
* callers must provide a `LongKey` instance to do the conversion.
*/
object PostgresLock {

/** Obtains an exclusive lock, retrying periodically via non-blocking waits */
def exclusive_[F[_]: Temporal: Logger, A: LongKey](
session: Session[F],
interval: FiniteDuration = 0.5.seconds
): Lock[F, A] =
createPolling[F, A](session, interval, tryAdvisoryLockSql, advisoryUnlockSql)

/** Obtains an exclusive lock, retrying periodically via non-blocking waits */
def exclusive[F[_]: Temporal: Logger, A: LongKey](
session: Resource[F, Session[F]],
interval: FiniteDuration = 0.5.seconds
): Lock[F, A] =
Kleisli(key => session.flatMap(exclusive_[F, A](_, interval).run(key)))

/** Obtains a shared lock, retrying periodically via non-blocking waits. */
def shared_[F[_]: Temporal: Logger, A: LongKey](
session: Session[F],
interval: FiniteDuration = 0.5.seconds
): Lock[F, A] =
createPolling(session, interval, tryAdvisoryLockSharedSql, advisoryUnlockSharedSql)

/** Obtains a shared lock, retrying periodically via non-blocking waits. */
def shared[F[_]: Temporal: Logger, A: LongKey](
session: Resource[F, Session[F]],
interval: FiniteDuration = 0.5.seconds
): Lock[F, A] =
Kleisli(key => session.flatMap(shared_[F, A](_, interval).run(key)))

private def createPolling[F[_]: Temporal: Logger, A: LongKey](
session: Session[F],
interval: FiniteDuration,
lockSql: Query[Long, Boolean],
unlockSql: Query[Long, Boolean]
): Lock[F, A] = {
def acq(n: Long): F[Boolean] =
session
.unique(lockSql)(n)
.attempt
.flatMap {
case Right(v) => v.pure[F]
case Left(ex) => Logger[F].warn(ex)(s"Acquiring postgres advisory lock failed! Retry in $interval.").as(false)
}
.flatTap {
case false =>
PostgresLockStats
.recordWaiting(session)(n)
.attempt
.flatMap(ignoreError[F](s"Failed to write lock stats record for key=$n"))
case true =>
PostgresLockStats
.removeWaiting(session)(n)
.attempt
.flatMap(ignoreError[F](s"Failed to remove lock stats record for key=$n"))
}

def rel(n: Long): F[Unit] =
session.unique(unlockSql)(n).void

Lock
.create(Kleisli(acq), interval)(Kleisli(rel))
.local(LongKey[A].asLong)
}

private def ignoreError[F[_]: Logger: Applicative](msg: => String)(eab: Either[Throwable, Unit]): F[Unit] =
eab match {
case Right(_) => ().pure[F]
case Left(ex) => Logger[F].error(ex)(msg)
}

// how to avoid that boilerplate???
implicit val void: Codec[Void] =
Codec.simple[Void](_ => "null", _ => Right(Void), skunk.data.Type.void)

private def tryAdvisoryLockSql: Query[Long, Boolean] =
sql"SELECT pg_try_advisory_lock($int8)".query(bool)

private def tryAdvisoryLockSharedSql: Query[Long, Boolean] =
sql"SELECT pg_try_advisory_lock_shared($int8)".query(bool)

private def advisoryUnlockSql: Query[Long, Boolean] =
sql"SELECT pg_advisory_unlock($int8)".query(bool)

private def advisoryUnlockSharedSql: Query[Long, Boolean] =
sql"SELECT pg_advisory_unlock_shared($int8)".query(bool)
}
Loading