diff --git a/acceptance-tests/src/test/resources/application.conf b/acceptance-tests/src/test/resources/application.conf index 0425bbf085..e16bfd441e 100644 --- a/acceptance-tests/src/test/resources/application.conf +++ b/acceptance-tests/src/test/resources/application.conf @@ -47,7 +47,3 @@ services { api-url = "http://localhost:9004/knowledge-graph" } } - -projects-tokens { - db-port = 49998 -} diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitHistoryChangesSpec.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitHistoryChangesSpec.scala index 08e316f3ac..e23513c87f 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitHistoryChangesSpec.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitHistoryChangesSpec.scala @@ -86,7 +86,7 @@ class CommitHistoryChangesSpec sleep((10 seconds).toMillis) - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 5) eventually { EventLog.findEvents(project.id, events.EventStatus.TriplesStore).toSet shouldBe newCommits.toList.toSet diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitSyncFlowsSpec.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitSyncFlowsSpec.scala index 577c8f29c0..ed8cec6c27 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitSyncFlowsSpec.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/CommitSyncFlowsSpec.scala @@ -69,7 +69,7 @@ class CommitSyncFlowsSpec extends AcceptanceSpec with ApplicationServices with T .status shouldBe Accepted And("relevant commit events are processed") - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 0) Then("the non missed events should be in the Triples Store") eventually { @@ -83,7 +83,7 @@ class CommitSyncFlowsSpec extends AcceptanceSpec with ApplicationServices with T EventLog.forceCategoryEventTriggering(CategoryName("COMMIT_SYNC"), project.id) And("commit events for the missed event are created and processed") - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 5) Then("triples for both of the project's commits should be in the Triples Store") eventually { diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/EventFlowsSpec.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/EventFlowsSpec.scala index 7d4fbf8b45..7dac3e43ba 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/EventFlowsSpec.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/EventFlowsSpec.scala @@ -59,7 +59,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv .status shouldBe Accepted And("commit events are processed") - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 5) Then(s"all the events should get the $TriplesStore status in the Event Log") EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(TriplesStore) @@ -89,7 +89,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv .status shouldBe Accepted And("relevant commit events are processed") - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 5) And(s"all the events should get the $GenerationNonRecoverableFailure status in the Event Log") EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(GenerationNonRecoverableFailure) @@ -151,7 +151,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv .status shouldBe Accepted And("relevant commit events are processed") - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 5) Then(s"all the events should get the $TransformationNonRecoverableFailure status in the Event Log") EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(TransformationNonRecoverableFailure) diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ProjectReProvisioningSpec.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ProjectReProvisioningSpec.scala index 449b488a33..987f564e28 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ProjectReProvisioningSpec.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ProjectReProvisioningSpec.scala @@ -87,7 +87,7 @@ class ProjectReProvisioningSpec extends AcceptanceSpec with ApplicationServices Then("the old data in the TS should be replaced with the new") sleep((10 seconds).toMillis) - `wait for events to be processed`(project.id, user.accessToken) + `wait for events to be processed`(project.id, user.accessToken, 5) eventually { knowledgeGraphClient diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/EventLog.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/EventLog.scala index bcd7d72063..8b38ad0baf 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/EventLog.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/EventLog.scala @@ -18,26 +18,22 @@ package io.renku.graph.acceptancetests.db -import cats.Applicative import cats.data.{Kleisli, NonEmptyList} import cats.effect.IO._ import cats.effect.unsafe.IORuntime import cats.effect.{IO, Resource} import cats.syntax.all._ -import com.dimafeng.testcontainers.FixedHostPortGenericContainer -import io.renku.db.{DBConfigProvider, PostgresContainer, SessionResource} +import io.renku.db.{DBConfigProvider, SessionResource} import io.renku.eventlog._ import io.renku.events.CategoryName import io.renku.graph.model.events.{CommitId, EventId, EventStatus} import io.renku.graph.model.projects import io.renku.graph.model.projects.GitLabId -import natchez.Trace.Implicits.noop import org.typelevel.log4cats.Logger import skunk._ import skunk.codec.text.varchar import skunk.implicits._ -import scala.collection.immutable import scala.util.Try object EventLog extends TypeSerializers { @@ -95,18 +91,6 @@ object EventLog extends TypeSerializers { private lazy val dbConfig: DBConfigProvider.DBConfig[EventLogDB] = new EventLogDbConfigProvider[Try].get().fold(throw _, identity) - private lazy val postgresContainer = FixedHostPortGenericContainer( - imageName = PostgresContainer.image, - env = immutable.Map("POSTGRES_USER" -> dbConfig.user.value, - "POSTGRES_PASSWORD" -> dbConfig.pass.value, - "POSTGRES_DB" -> dbConfig.name.value - ), - exposedPorts = Seq(dbConfig.port.value), - exposedHostPort = dbConfig.port.value, - exposedContainerPort = dbConfig.port.value, - command = Seq(s"-p ${dbConfig.port.value}") - ) - def removeGlobalCommitSyncRow(projectId: GitLabId)(implicit ioRuntime: IORuntime): Unit = execute { session => val command: Command[projects.GitLabId] = sql""" DELETE FROM subscription_category_sync_time @@ -116,19 +100,11 @@ object EventLog extends TypeSerializers { } def startDB()(implicit logger: Logger[IO]): IO[Unit] = for { - _ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start())) + _ <- PostgresDB.startPostgres + _ <- PostgresDB.initializeDatabase(dbConfig) _ <- logger.info("event_log DB started") } yield () private lazy val sessionResource: Resource[IO, SessionResource[IO, EventLogDB]] = - Session - .pooled( - host = postgresContainer.host, - port = dbConfig.port.value, - database = dbConfig.name.value, - user = dbConfig.user.value, - password = Some(dbConfig.pass.value), - max = dbConfig.connectionPool.value - ) - .map(new SessionResource(_)) + PostgresDB.sessionPoolResource(dbConfig) } diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/PostgresDB.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/PostgresDB.scala new file mode 100644 index 0000000000..c5ed6e4a4c --- /dev/null +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/PostgresDB.scala @@ -0,0 +1,131 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.graph.acceptancetests.db + +import cats.effect._ +import com.dimafeng.testcontainers.FixedHostPortGenericContainer +import eu.timepit.refined.auto._ +import io.renku.db.DBConfigProvider.DBConfig +import io.renku.db.{PostgresContainer, SessionResource} +import skunk.codec.all._ +import skunk.implicits._ +import skunk._ +import natchez.Trace.Implicits.noop +import org.typelevel.log4cats.Logger +import scala.concurrent.duration._ + +object PostgresDB { + private[this] val starting: Ref[IO, Boolean] = Ref.unsafe[IO, Boolean](false) + + private val dbConfig = DBConfig[Any]( + host = "localhost", + port = 5432, + name = "postgres", + user = "at", + pass = "at", + connectionPool = 8 + ) + + private val postgresContainer = { + val cfg = dbConfig + FixedHostPortGenericContainer( + imageName = PostgresContainer.image, + env = Map( + "POSTGRES_USER" -> cfg.user.value, + "POSTGRES_PASSWORD" -> cfg.pass.value + ), + exposedPorts = Seq(cfg.port.value), + exposedHostPort = cfg.port.value, + exposedContainerPort = cfg.port.value, + command = Seq(s"-p ${cfg.port.value}") + ) + } + + def startPostgres(implicit L: Logger[IO]) = + starting.getAndUpdate(_ => true).flatMap { + case false => + IO.unlessA(postgresContainer.container.isRunning)( + IO(postgresContainer.start()) *> waitForPostgres *> L.info( + "PostgreSQL database started" + ) + ) + + case true => + waitForPostgres + } + + private def waitForPostgres = + fs2.Stream + .awakeDelay[IO](0.5.seconds) + .evalMap { _ => + Session + .single[IO]( + host = dbConfig.host, + port = dbConfig.port, + user = dbConfig.user.value, + password = Some(dbConfig.pass.value), + database = dbConfig.name.value + ) + .use(_.unique(sql"SELECT 1".query(int4))) + .attempt + } + .map(_.fold(_ => 1, _ => 0)) + .take(100) + .find(_ == 0) + .compile + .drain + + def sessionPool(dbCfg: DBConfig[_]): Resource[IO, Resource[IO, Session[IO]]] = + Session + .pooled[IO]( + host = postgresContainer.host, + port = dbConfig.port.value, + database = dbCfg.name.value, + user = dbCfg.user.value, + password = Some(dbCfg.pass.value), + max = dbConfig.connectionPool.value + ) + + def sessionPoolResource[A](dbCfg: DBConfig[_]): Resource[IO, SessionResource[IO, A]] = + sessionPool(dbCfg).map(new SessionResource[IO, A](_)) + + def initializeDatabase(cfg: DBConfig[_]): IO[Unit] = { + val session = Session.single[IO]( + host = dbConfig.host, + port = dbConfig.port, + user = dbConfig.user.value, + password = Some(dbConfig.pass.value), + database = dbConfig.name.value + ) + + // note: it would be simpler to use the default user that is created with the container, but that requires + // to first refactor how the configuration is loaded. Currently services load it from the file every time and so + // this creates the users as expected from the given config + val createRole: Command[Void] = + sql"create role #${cfg.user.value} with password '#${cfg.pass.value}' superuser login".command + val createDatabase: Command[Void] = sql"create database #${cfg.name.value}".command + val grants: Command[Void] = sql"grant all on database #${cfg.name.value} to #${cfg.user.value}".command + + session.use { s => + s.execute(createRole) *> + s.execute(createDatabase) *> + s.execute(grants) + }.void + } +} diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TokenRepository.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TokenRepository.scala index eb034a111f..171c09f2df 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TokenRepository.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TokenRepository.scala @@ -18,11 +18,8 @@ package io.renku.graph.acceptancetests.db -import cats.Applicative import cats.effect.IO -import cats.effect.unsafe.IORuntime -import com.dimafeng.testcontainers.FixedHostPortGenericContainer -import io.renku.db.{DBConfigProvider, PostgresContainer} +import io.renku.db.DBConfigProvider import io.renku.tokenrepository.repository.{ProjectsTokensDB, ProjectsTokensDbConfigProvider} import org.typelevel.log4cats.Logger @@ -33,20 +30,9 @@ object TokenRepository { private lazy val dbConfig: DBConfigProvider.DBConfig[ProjectsTokensDB] = new ProjectsTokensDbConfigProvider[Try].get().fold(throw _, identity) - private lazy val postgresContainer = FixedHostPortGenericContainer( - imageName = PostgresContainer.image, - env = Map("POSTGRES_USER" -> dbConfig.user.value, - "POSTGRES_PASSWORD" -> dbConfig.pass.value, - "POSTGRES_DB" -> dbConfig.name.value - ), - exposedPorts = Seq(dbConfig.port.value), - exposedHostPort = dbConfig.port.value, - exposedContainerPort = dbConfig.port.value, - command = Seq(s"-p ${dbConfig.port.value}") - ) - - def startDB()(implicit ioRuntime: IORuntime, logger: Logger[IO]): IO[Unit] = for { - _ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start())) + def startDB()(implicit logger: Logger[IO]): IO[Unit] = for { + _ <- PostgresDB.startPostgres + _ <- PostgresDB.initializeDatabase(dbConfig) _ <- logger.info("projects_tokens DB started") } yield () } diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala index 81ea5e25af..7d403ca8e1 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala @@ -18,20 +18,32 @@ package io.renku.graph.acceptancetests.db +import cats.effect.{IO, Resource, Temporal} import cats.{Applicative, Monad} -import cats.effect.{IO, Temporal} import eu.timepit.refined.auto._ +import io.renku.db.DBConfigProvider +import io.renku.triplesgenerator.TgLockDB.SessionResource +import io.renku.triplesgenerator.{TgLockDB, TgLockDbConfigProvider} import io.renku.triplesstore._ import org.typelevel.log4cats.Logger import scala.concurrent.duration._ +import scala.util.Try object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDataset { protected override val jenaRunMode: JenaRunMode = JenaRunMode.FixedPortContainer(3030) + private val dbConfig: DBConfigProvider.DBConfig[TgLockDB] = + new TgLockDbConfigProvider[Try].get().fold(throw _, identity) + + lazy val sessionResource: Resource[IO, SessionResource[IO]] = + PostgresDB.sessionPoolResource(dbConfig) + def start()(implicit logger: Logger[IO]): IO[Unit] = for { _ <- Applicative[IO].unlessA(isRunning)(IO(container.start())) + _ <- PostgresDB.startPostgres + _ <- PostgresDB.initializeDatabase(dbConfig) _ <- waitForReadiness _ <- logger.info("Triples Store started") } yield () @@ -40,4 +52,5 @@ object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDat private def waitForReadiness(implicit logger: Logger[IO]): IO[Unit] = Monad[IO].whileM_(IO(!isRunning))(logger.info("Waiting for TS") >> (Temporal[IO] sleep (500 millis))) + } diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/flows/TSProvisioning.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/flows/TSProvisioning.scala index 0306202084..2bc23bd9e2 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/flows/TSProvisioning.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/flows/TSProvisioning.scala @@ -20,6 +20,7 @@ package io.renku.graph.acceptancetests.flows import cats.data.NonEmptyList import cats.effect.unsafe.IORuntime +import cats.syntax.all._ import io.renku.events.CategoryName import io.renku.generators.Generators.Implicits._ import io.renku.graph.acceptancetests.data @@ -74,15 +75,20 @@ trait TSProvisioning sleep((5 seconds).toMillis) } - `wait for events to be processed`(project.id, accessToken) + `wait for events to be processed`(project.id, accessToken, 5) } - def `wait for events to be processed`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion = + def `wait for events to be processed`( + projectId: projects.GitLabId, + accessToken: AccessToken, + expectedMinTotal: Int + ): Assertion = eventually { val response = webhookServiceClient.`GET projects/:id/events/status`(projectId, accessToken) response.status shouldBe Ok response.jsonBody.hcursor.downField("activated").as[Boolean].value shouldBe true response.jsonBody.hcursor.downField("progress").downField("percentage").as[Double].value shouldBe 100d + response.jsonBody.hcursor.downField("progress").downField("total").as[Int].value should be >= expectedMinTotal } def `check hook cannot be found`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion = eventually { diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/knowledgegraph/DatasetsResourcesSpec.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/knowledgegraph/DatasetsResourcesSpec.scala index bab0de0c83..6773d78e9d 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/knowledgegraph/DatasetsResourcesSpec.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/knowledgegraph/DatasetsResourcesSpec.scala @@ -492,7 +492,7 @@ class DatasetsResourcesSpec gitLabStub.setupProject(project, commitId) mockCommitDataOnTripleGenerator(project, toPayloadJsonLD(project), commitId) `data in the Triples Store`(project, commitId, creator.accessToken) - `wait for events to be processed`(project.id, creator.accessToken) + `wait for events to be processed`(project.id, creator.accessToken, 5) When("an authenticated and authorised user fetches dataset details through GET knowledge-graph/datasets/:id") val detailsResponse = diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/testing/AcceptanceTestPatience.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/testing/AcceptanceTestPatience.scala index 6b164d0a83..82e59c3b3d 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/testing/AcceptanceTestPatience.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/testing/AcceptanceTestPatience.scala @@ -24,7 +24,7 @@ import org.scalatest.time.{Minutes, Second, Span} trait AcceptanceTestPatience extends AbstractPatienceConfiguration { implicit override val patienceConfig: PatienceConfig = PatienceConfig( - timeout = scaled(Span(4, Minutes)), + timeout = scaled(Span(6, Minutes)), interval = scaled(Span(1, Second)) ) } diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/tooling/ServicesClients.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/tooling/ServicesClients.scala index 86e2b15474..d5d28b2eed 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/tooling/ServicesClients.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/tooling/ServicesClients.scala @@ -25,10 +25,12 @@ import cats.syntax.all._ import eu.timepit.refined.api.Refined import eu.timepit.refined.auto._ import eu.timepit.refined.string.Url -import io.circe.Json +import io.circe.generic.semiauto.deriveDecoder +import io.circe.{Decoder, Json} import io.circe.literal._ import io.renku.control.Throttler import io.renku.graph.acceptancetests.tooling.ServiceClient.ClientResponse +import io.renku.graph.model.events.{EventId, EventStatus} import io.renku.graph.model.projects import io.renku.http.client.{AccessToken, BasicAuthCredentials, RestClient} import io.renku.http.tinytypes.TinyTypeURIEncoder._ @@ -120,6 +122,21 @@ object KnowledgeGraphClient { } object EventLogClient { + final case class ProjectEvent( + id: EventId, + project: ProjectEvent.Project, + status: EventStatus + ) + object ProjectEvent { + import io.renku.tinytypes.json.TinyTypeDecoders._ + + final case class Project(id: projects.GitLabId, path: projects.Path) + object Project { + implicit val jsonDecoder: Decoder[Project] = deriveDecoder + } + + implicit val jsonDecoder: Decoder[ProjectEvent] = deriveDecoder + } def apply()(implicit logger: Logger[IO]): EventLogClient = new EventLogClient @@ -138,6 +155,19 @@ object EventLogClient { } yield () }.unsafeRunSync() + def getEvents(project: Either[projects.GitLabId, projects.Path]): IO[List[ProjectEvent]] = + for { + uri <- validateUri(s"$baseUrl/events").map(uri => + project.fold(id => uri.withQueryParam("project-id", id.value), + path => uri.withQueryParam("project-path", path.value) + ) + ) + req = request(Method.GET, uri) + r <- send(req) { case (Status.Ok, _, resp) => + resp.as[List[ProjectEvent]] + } + } yield r + private def createRequest(uri: Uri, event: Json) = request(Method.POST, uri).withMultipartBuilder .addPart("event", event) diff --git a/build.sbt b/build.sbt index 9a004f819e..8dee2571ac 100644 --- a/build.sbt +++ b/build.sbt @@ -177,6 +177,13 @@ lazy val triplesGenerator = project .in(file("triples-generator")) .withId("triples-generator") .settings(commonSettings) + .settings( + reStart / envVars := Map( + "JENA_RENKU_PASSWORD" -> "renku", + "JENA_ADMIN_PASSWORD" -> "renku", + "RENKU_URL" -> "http://localhost:3000" + ) + ) .dependsOn( triplesGeneratorApi % "compile->compile; test->test", entitiesSearch, diff --git a/graph-commons/src/main/scala/io/renku/db/SessionResource.scala b/graph-commons/src/main/scala/io/renku/db/SessionResource.scala index 370e3696e2..fbf997f73e 100644 --- a/graph-commons/src/main/scala/io/renku/db/SessionResource.scala +++ b/graph-commons/src/main/scala/io/renku/db/SessionResource.scala @@ -37,6 +37,8 @@ class SessionResource[F[_]: MonadCancelThrow, TargetDB](resource: Resource[F, Se ): F[ResultType] = resource.use { session => session.transaction.use(transaction => query.run((transaction, session))) } + + val session: Resource[F, Session[F]] = resource } object SessionPoolResource { diff --git a/graph-commons/src/main/scala/io/renku/lock/Lock.scala b/graph-commons/src/main/scala/io/renku/lock/Lock.scala new file mode 100644 index 0000000000..343c720d55 --- /dev/null +++ b/graph-commons/src/main/scala/io/renku/lock/Lock.scala @@ -0,0 +1,76 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats.Functor +import cats.data.Kleisli +import cats.effect._ +import cats.effect.std.Mutex +import cats.syntax.all._ +import fs2.Stream + +import scala.concurrent.duration.FiniteDuration + +object Lock { + + def none[F[_], A]: Lock[F, A] = + Kleisli(_ => Resource.unit) + + def global[F[_]: Async, A]: F[Lock[F, A]] = + Mutex[F].map(mutex => Kleisli(_ => mutex.lock)) + + def memory[F[_]: Async, A]: F[Lock[F, A]] = + memory0[F, A].map(_._2) + + private[lock] def memory0[F[_]: Async, A]: F[(Ref[F, Map[A, Mutex[F]]], Lock[F, A])] = + Ref.of[F, Map[A, Mutex[F]]](Map.empty[A, Mutex[F]]).map { data => + def set(k: A) = + Resource.eval(Mutex[F]).flatMap { newMutex => + val next = + Resource.eval( + data.updateAndGet(m => + m.updatedWith(k) { + case r @ Some(_) => r + case None => Some(newMutex) + } + ) + ) + + next.flatMap(_(k).lock.onFinalize(data.update(_.removed(k)))) + } + + (data, Kleisli(set)) + } + + def create[F[_]: Temporal, A](acquire: Kleisli[F, A, Boolean], interval: FiniteDuration)( + release: Kleisli[F, A, Unit] + ): Lock[F, A] = { + val acq: Kleisli[F, A, Unit] = Kleisli { key => + (Stream.eval(acquire.run(key)) ++ Stream.sleep(interval).drain).repeat + .find(_ == true) + .compile + .drain + } + + from(acq)(release) + } + + def from[F[_]: Functor, A](acquire: Kleisli[F, A, Unit])(release: Kleisli[F, A, Unit]): Lock[F, A] = + Kleisli(key => Resource.make(acquire.run(key))(_ => release.run(key))) +} diff --git a/graph-commons/src/main/scala/io/renku/lock/LongKey.scala b/graph-commons/src/main/scala/io/renku/lock/LongKey.scala new file mode 100644 index 0000000000..5042d64f8f --- /dev/null +++ b/graph-commons/src/main/scala/io/renku/lock/LongKey.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import io.renku.graph.model.entities.Project +import io.renku.graph.model.projects + +import scala.util.hashing.MurmurHash3 + +trait LongKey[A] { self => + def asLong(a: A): Long + + def contramap[B](f: B => A): LongKey[B] = + LongKey.create(b => self.asLong(f(b))) +} + +object LongKey { + def apply[A](implicit lk: LongKey[A]): LongKey[A] = lk + + def create[A](f: A => Long): LongKey[A] = + (a: A) => f(a) + + implicit val forLong: LongKey[Long] = + create(identity) + + implicit val forInt: LongKey[Int] = + create(_.toLong) + + implicit val forString: LongKey[String] = + forInt.contramap(MurmurHash3.stringHash) + + implicit val forProjectPath: LongKey[projects.Path] = + forString.contramap(_.value) + + implicit val forProject: LongKey[Project] = + forProjectPath.contramap(_.path) +} diff --git a/graph-commons/src/main/scala/io/renku/lock/PostgresLock.scala b/graph-commons/src/main/scala/io/renku/lock/PostgresLock.scala new file mode 100644 index 0000000000..375609e9f9 --- /dev/null +++ b/graph-commons/src/main/scala/io/renku/lock/PostgresLock.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats.Applicative +import cats.data.Kleisli +import cats.effect._ +import cats.syntax.all._ +import org.typelevel.log4cats.Logger +import skunk._ +import skunk.codec.all._ +import skunk.implicits._ + +import scala.concurrent.duration._ + +/** Uses postgres' per session advisory locks. It requires the key to be of type bigint, so + * callers must provide a `LongKey` instance to do the conversion. + */ +object PostgresLock { + + /** Obtains an exclusive lock, retrying periodically via non-blocking waits */ + def exclusive_[F[_]: Temporal: Logger, A: LongKey]( + session: Session[F], + interval: FiniteDuration = 0.5.seconds + ): Lock[F, A] = + createPolling[F, A](session, interval, tryAdvisoryLockSql, advisoryUnlockSql) + + /** Obtains an exclusive lock, retrying periodically via non-blocking waits */ + def exclusive[F[_]: Temporal: Logger, A: LongKey]( + session: Resource[F, Session[F]], + interval: FiniteDuration = 0.5.seconds + ): Lock[F, A] = + Kleisli(key => session.flatMap(exclusive_[F, A](_, interval).run(key))) + + /** Obtains a shared lock, retrying periodically via non-blocking waits. */ + def shared_[F[_]: Temporal: Logger, A: LongKey]( + session: Session[F], + interval: FiniteDuration = 0.5.seconds + ): Lock[F, A] = + createPolling(session, interval, tryAdvisoryLockSharedSql, advisoryUnlockSharedSql) + + /** Obtains a shared lock, retrying periodically via non-blocking waits. */ + def shared[F[_]: Temporal: Logger, A: LongKey]( + session: Resource[F, Session[F]], + interval: FiniteDuration = 0.5.seconds + ): Lock[F, A] = + Kleisli(key => session.flatMap(shared_[F, A](_, interval).run(key))) + + private def createPolling[F[_]: Temporal: Logger, A: LongKey]( + session: Session[F], + interval: FiniteDuration, + lockSql: Query[Long, Boolean], + unlockSql: Query[Long, Boolean] + ): Lock[F, A] = { + def acq(n: Long): F[Boolean] = + session + .unique(lockSql)(n) + .attempt + .flatMap { + case Right(v) => v.pure[F] + case Left(ex) => Logger[F].warn(ex)(s"Acquiring postgres advisory lock failed! Retry in $interval.").as(false) + } + .flatTap { + case false => + PostgresLockStats + .recordWaiting(session)(n) + .attempt + .flatMap(ignoreError[F](s"Failed to write lock stats record for key=$n")) + case true => + PostgresLockStats + .removeWaiting(session)(n) + .attempt + .flatMap(ignoreError[F](s"Failed to remove lock stats record for key=$n")) + } + + def rel(n: Long): F[Unit] = + session.unique(unlockSql)(n).void + + Lock + .create(Kleisli(acq), interval)(Kleisli(rel)) + .local(LongKey[A].asLong) + } + + private def ignoreError[F[_]: Logger: Applicative](msg: => String)(eab: Either[Throwable, Unit]): F[Unit] = + eab match { + case Right(_) => ().pure[F] + case Left(ex) => Logger[F].error(ex)(msg) + } + + // how to avoid that boilerplate??? + implicit val void: Codec[Void] = + Codec.simple[Void](_ => "null", _ => Right(Void), skunk.data.Type.void) + + private def tryAdvisoryLockSql: Query[Long, Boolean] = + sql"SELECT pg_try_advisory_lock($int8)".query(bool) + + private def tryAdvisoryLockSharedSql: Query[Long, Boolean] = + sql"SELECT pg_try_advisory_lock_shared($int8)".query(bool) + + private def advisoryUnlockSql: Query[Long, Boolean] = + sql"SELECT pg_advisory_unlock($int8)".query(bool) + + private def advisoryUnlockSharedSql: Query[Long, Boolean] = + sql"SELECT pg_advisory_unlock_shared($int8)".query(bool) +} diff --git a/graph-commons/src/main/scala/io/renku/lock/PostgresLockStats.scala b/graph-commons/src/main/scala/io/renku/lock/PostgresLockStats.scala new file mode 100644 index 0000000000..fa5848b606 --- /dev/null +++ b/graph-commons/src/main/scala/io/renku/lock/PostgresLockStats.scala @@ -0,0 +1,94 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats._ +import cats.effect._ +import cats.syntax.all._ +import skunk._ +import skunk.codec.all._ +import skunk.implicits._ + +import java.time.{Duration, OffsetDateTime} + +object PostgresLockStats { + + final case class Waiting( + objectId: Long, + pid: Long, + createdAt: OffsetDateTime, + waitDuration: Duration + ) + + final case class Stats( + currentLocks: Long, + waiting: List[Waiting] + ) + + def getStats[F[_]: Monad](session: Session[F]): F[Stats] = + for { + n <- session.unique(countCurrentLocks) + w <- session.execute(queryWaiting) + } yield Stats(n, w) + + def ensureStatsTable[F[_]: Applicative](session: Session[F]): F[Unit] = + session.execute(createTable).void + + def recordWaiting[F[_]: MonadCancelThrow](session: Session[F])(key: Long): F[Unit] = + session.execute(upsertWaiting)(key).void + + def removeWaiting[F[_]: MonadCancelThrow](session: Session[F])(key: Long): F[Unit] = + session.execute(deleteWaiting)(key).void + + private val countCurrentLocks: Query[Void, Long] = + sql""" + SELECT COUNT(objid) + FROM pg_locks + WHERE locktype = 'advisory' and granted = true; + """ + .query(int8) + + private val queryWaiting: Query[Void, Waiting] = + sql""" + SELECT obj_id, sess_id, created_at, now() - created_at + FROM kg_lock_stats + """ + .query(int8 *: int8 *: timestamptz *: interval) + .to[Waiting] + + private val createTable: Command[Void] = + sql""" + CREATE TABLE IF NOT EXISTS "kg_lock_stats"( + obj_id bigint not null, + sess_id bigint not null, + created_at timestamptz not null, + primary key (obj_id, sess_id) + )""".command + + private val upsertWaiting: Command[Long] = + sql""" + INSERT + INTO kg_lock_stats (obj_id, sess_id, created_at) + VALUES ($int8, pg_backend_pid(), now()) + ON CONFLICT (obj_id, sess_id) DO NOTHING + """.command + + private val deleteWaiting: Command[Long] = + sql"""DELETE FROM kg_lock_stats WHERE obj_id = $int8 AND sess_id = pg_backend_pid()""".command +} diff --git a/graph-commons/src/main/scala/io/renku/lock/package.scala b/graph-commons/src/main/scala/io/renku/lock/package.scala new file mode 100644 index 0000000000..c5d60ad96e --- /dev/null +++ b/graph-commons/src/main/scala/io/renku/lock/package.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku + +import cats.data.Kleisli +import cats.effect._ + +package object lock { + + type Lock[F[_], A] = Kleisli[Resource[F, *], A, Unit] + + object syntax { + final implicit class LockSyntax[F[_], A](self: Lock[F, A]) { + def surround[B](fa: A => F[B])(implicit F: MonadCancelThrow[F]): A => F[B] = + a => self(a).surround(fa(a)) + + def surround[B](k: Kleisli[F, A, B])(implicit F: MonadCancelThrow[F]): Kleisli[F, A, B] = + Kleisli(surround(k.run)) + } + } +} diff --git a/graph-commons/src/main/scala/io/renku/metrics/Histogram.scala b/graph-commons/src/main/scala/io/renku/metrics/Histogram.scala index f03b3dd3d9..dc614e93bd 100644 --- a/graph-commons/src/main/scala/io/renku/metrics/Histogram.scala +++ b/graph-commons/src/main/scala/io/renku/metrics/Histogram.scala @@ -67,6 +67,7 @@ class SingleValueHistogramImpl[F[_]: MonadThrow](val name: String Refined NonEmp trait LabeledHistogram[F[_]] extends Histogram[F] { def startTimer(labelValue: String): F[Histogram.Timer[F]] + def observe(labelValue: String, amt: Double): F[Unit] } object LabeledHistogram { @@ -117,6 +118,11 @@ class LabeledHistogramImpl[F[_]: MonadThrow](val name: String Refined NonEmpty, private val maybeThresholdMillis = maybeThreshold.map(_.toMillis.toDouble) + def observe(labelValue: String, amt: Double): F[Unit] = + MonadThrow[F].catchNonFatal { + wrappedCollector.labels(labelValue).observe(amt) + } + override def startTimer(labelValue: String): F[Histogram.Timer[F]] = MonadThrow[F].catchNonFatal { maybeThresholdMillis match { case None => new metrics.LabeledHistogram.NoThresholdTimerImpl(wrappedCollector.labels(labelValue).startTimer()) diff --git a/graph-commons/src/test/scala/io/renku/lock/MemoryLockSpec.scala b/graph-commons/src/test/scala/io/renku/lock/MemoryLockSpec.scala new file mode 100644 index 0000000000..528caed661 --- /dev/null +++ b/graph-commons/src/test/scala/io/renku/lock/MemoryLockSpec.scala @@ -0,0 +1,94 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats.effect._ +import cats.effect.std.CountDownLatch +import cats.effect.testing.scalatest.AsyncIOSpec +import cats.syntax.all._ +import org.scalatest.matchers.should +import org.scalatest.wordspec.AsyncWordSpec + +import scala.concurrent.duration._ + +class MemoryLockSpec extends AsyncWordSpec with AsyncIOSpec with should.Matchers { + + def debug(m: String): IO[Unit] = + IO.realTimeInstant.map(t => s"$t : $m").flatMap(IO.println) + + "memory lock" should { + + "sequentially on same key" in { + for { + result <- Ref.of[IO, List[FiniteDuration]](Nil) + update = IO.sleep(200.millis) *> IO.realTime.flatMap(time => result.update(time :: _)) + + lock <- Lock.memory[IO, Int] + latch <- CountDownLatch[IO](1) + + f1 <- Async[IO].start(latch.await *> lock(1).use(_ => update)) + f2 <- Async[IO].start(latch.await *> lock(1).use(_ => update)) + f3 <- Async[IO].start(latch.await *> lock(1).use(_ => update)) + + _ <- latch.release + _ <- List(f1, f2, f3).traverse_(_.join) + + diff <- result.get.map(list => list.max - list.min) + _ = diff should be >= 400.millis + } yield () + } + + "parallel on different key" in { + for { + result <- Ref.of[IO, List[FiniteDuration]](Nil) + update = IO.sleep(200.millis) *> IO.realTime.flatMap(time => result.update(time :: _)) + + lock <- Lock.memory[IO, Int] + latch <- CountDownLatch[IO](1) + + f1 <- Async[IO].start(latch.await *> lock(1).use(_ => update)) + f2 <- Async[IO].start(latch.await *> lock(2).use(_ => update)) + f3 <- Async[IO].start(latch.await *> lock(3).use(_ => update)) + _ <- latch.release + _ <- List(f1, f2, f3).traverse_(_.join) + + diff <- result.get.map(list => list.max - list.min) + _ = diff should be < 300.millis + } yield () + } + } + + "remove mutexes on release" in { + for { + (state, lock) <- Lock.memory0[IO, Int] + latch <- CountDownLatch[IO](1) + + // initial state is empty + _ <- state.get.asserting(_ shouldBe empty) + + // one mutex if active + _ <- Async[IO].start(lock(1).use(_ => latch.await)) + _ <- state.get.asserting(_.size shouldBe 1) + + // once released, mutex must be removed from map + _ <- latch.release + _ <- state.get.asserting(_ shouldBe empty) + } yield () + } +} diff --git a/graph-commons/src/test/scala/io/renku/lock/PostgresLockSpec.scala b/graph-commons/src/test/scala/io/renku/lock/PostgresLockSpec.scala new file mode 100644 index 0000000000..3249c234c9 --- /dev/null +++ b/graph-commons/src/test/scala/io/renku/lock/PostgresLockSpec.scala @@ -0,0 +1,265 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats.effect._ +import cats.effect.std.CountDownLatch +import cats.effect.testing.scalatest.AsyncIOSpec +import cats.syntax.all._ +import fs2.{Pipe, concurrent} +import io.renku.interpreters.TestLogger +import org.scalatest.matchers.should +import org.scalatest.wordspec.AsyncWordSpec +import org.typelevel.log4cats.Logger +import skunk.data._ +import skunk.net.protocol.{Describe, Parse} +import skunk.util.Typer +import skunk._ +import skunk.implicits._ + +import scala.concurrent.duration._ + +class PostgresLockSpec extends AsyncWordSpec with AsyncIOSpec with should.Matchers with PostgresLockTest { + + "PostgresLock.exclusive" should { + val pollInterval = 100.millis + + "sequentially on same key" in withContainers { cnt => + implicit val logger: Logger[IO] = TestLogger() + val lock = exclusiveLock(cnt, pollInterval) + + for { + result <- Ref.of[IO, List[FiniteDuration]](Nil) + update = IO.sleep(200.millis) *> IO.realTime.flatMap(time => result.update(time :: _)) + latch <- CountDownLatch[IO](1) + + f1 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + f2 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + f3 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + + _ <- latch.release + _ <- List(f1, f2, f3).traverse_(_.join) + + diff <- result.get.map(list => list.max - list.min) + _ = diff should be >= 400.millis + } yield () + } + + "sequentially on same key using session constructor" in withContainers { cnt => + implicit val logger: Logger[IO] = TestLogger() + + def createLock = session(cnt).map(makeExclusiveLock(_, pollInterval)) + + (createLock, createLock, createLock).tupled.use { case (l1, l2, l3) => + for { + result <- Ref.of[IO, List[FiniteDuration]](Nil) + update = IO.sleep(200.millis) *> IO.realTime.flatMap(time => result.update(time :: _)) + latch <- CountDownLatch[IO](1) + + f1 <- Async[IO].start(latch.await *> l1("p1").use(_ => update)) + f2 <- Async[IO].start(latch.await *> l2("p1").use(_ => update)) + f3 <- Async[IO].start(latch.await *> l3("p1").use(_ => update)) + + _ <- latch.release + _ <- List(f1, f2, f3).traverse_(_.join) + + diff <- result.get.map(list => list.max - list.min) + _ = diff should be >= 400.millis + } yield () + } + } + + "parallel on different key" in withContainers { cnt => + implicit val logger: Logger[IO] = TestLogger() + val lock = exclusiveLock(cnt) + + for { + result <- Ref.of[IO, List[FiniteDuration]](Nil) + update = IO.sleep(200.millis) *> IO.realTime.flatMap(time => result.update(time :: _)) + latch <- CountDownLatch[IO](1) + + f1 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + f2 <- Async[IO].start(latch.await *> lock("p2").use(_ => update)) + f3 <- Async[IO].start(latch.await *> lock("p3").use(_ => update)) + + _ <- latch.release + _ <- List(f1, f2, f3).traverse_(_.join) + + diff <- result.get.map(list => list.max - list.min) + _ = diff should be < 300.millis + } yield () + } + + "log if acquiring fails" in withContainers { cnt => + implicit val logger: TestLogger[IO] = TestLogger() + + val exception = new Exception("boom") + val makeSession = (Resource.eval(Ref.of[IO, Int](0)), session(cnt)).mapN { (counter, goodSession) => + new PostgresLockSpec.TestSession { + override def unique[A, B](query: Query[A, B])(args: A) = + counter.getAndUpdate(_ + 1).flatMap { + case n if n < 2 => IO.raiseError(exception) + case _ => goodSession.unique(query)(args) + } + + override def execute[A](command: Command[A])(args: A) = + goodSession.execute(command)(args) + } + } + + val interval = 50.millis + def lock = makeSession.map(PostgresLock.exclusive_[IO, String](_, interval)) + + lock.flatMap(_("p1")).use(_ => IO.unit).asserting { _ => + logger.logged( + TestLogger.Level.Warn( + s"Acquiring postgres advisory lock failed! Retry in $interval.", + exception + ) + ) + } + } + } + + "PostgresLock.shared" should { + "allow multiple shared locks" in withContainers { cnt => + implicit val logger: Logger[IO] = TestLogger() + val lock = sharedLock(cnt) + + for { + result <- Ref.of[IO, List[FiniteDuration]](Nil) + update = IO.sleep(200.millis) *> IO.realTime.flatMap(time => result.update(time :: _)) + latch <- CountDownLatch[IO](1) + + f1 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + f2 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + f3 <- Async[IO].start(latch.await *> lock("p1").use(_ => update)) + + _ <- latch.release + _ <- List(f1, f2, f3).traverse_(_.join) + + diff <- result.get.map(list => list.max - list.min) + _ = diff should be < 300.millis + } yield () + } + } + + "PostgresLock stats" should { + "log if writing/removing stats records fail" in withContainers { cnt => + implicit val logger: TestLogger[IO] = TestLogger() + session(cnt).use { s => + for { + _ <- s.execute(sql"DROP TABLE IF EXISTS kg_lock_stats".command) + (_, release) <- makeExclusiveLock(s).run("1").allocated + _ <- release + + key = LongKey[String].asLong("1") + _ = logger.getMessages(TestLogger.Level.Error).map(_.message) shouldBe List( + s"Failed to remove lock stats record for key=$key" + ) + } yield () + } + } + + "show when a session is waiting for a lock" in withContainers { cnt => + implicit val logger: Logger[IO] = TestLogger() + (session(cnt), session(cnt)).tupled.use { case (s1, s2) => + for { + _ <- resetLockTable(s1) + (_, release) <- makeExclusiveLock(s1, 1.second).run("1").allocated + fiber <- Async[IO].start(makeExclusiveLock(s2).run("1").allocated) + _ <- IO.sleep(50.millis) + stats <- PostgresLockStats.getStats(s1) + _ <- release + _ <- fiber.join + stats2 <- PostgresLockStats.getStats(s1) + + _ = stats.waiting.size shouldBe 1 + _ = stats2.waiting shouldBe Nil + } yield () + } + } + + "remove records for the owning session only" in withContainers { cnt => + implicit val logger: Logger[IO] = TestLogger() + (session(cnt), session(cnt), session(cnt)).tupled.use { case (s1, s2, s3) => + for { + _ <- resetLockTable(s1) + (_, release) <- makeExclusiveLock(s1).run("1").allocated + f1 <- Async[IO].start(makeExclusiveLock(s2, 4.millis).run("1").allocated) + // use a longer interval so that there is no attempt to insert another record + f2 <- Async[IO].start(makeExclusiveLock(s3, 1.second).run("1").allocated) + _ <- IO.sleep(50.millis) + + // there must be two records waiting for the same lock + stats <- PostgresLockStats.getStats(s1) + _ = stats.currentLocks shouldBe 1 + _ = stats.waiting.size shouldBe 2 + _ = stats.waiting.map(_.objectId).toSet.size shouldBe 1 + _ = stats.waiting.map(_.pid).toSet.size shouldBe 2 + + // releasing the lock so that f1 or f2 grabs it + // then it must not remove the record from the other one + _ <- release + _ <- IO.sleep(200.millis) + + stats2 <- PostgresLockStats.getStats(s1) + _ = stats2.waiting.size shouldBe 1 + _ = stats2.currentLocks shouldBe 1 + + _ <- List(f1, f2).parTraverse_(_.join.flatMap { + case Outcome.Succeeded(rel) => rel.flatMap(_._2) + case _ => IO.raiseError(new Exception("joining failed")) + }) + } yield () + } + } + } +} + +object PostgresLockSpec { + abstract class TestSession extends Session[IO] { + override def execute[A, B](query: Query[A, B])(args: A): IO[List[B]] = ??? + override def option[A, B](query: Query[A, B])(args: A): IO[Option[B]] = ??? + override def stream[A, B](command: Query[A, B])(args: A, chunkSize: Int): fs2.Stream[IO, B] = ??? + override def cursor[A, B](query: Query[A, B])(args: A): Resource[IO, Cursor[IO, B]] = ??? + override def execute[A](command: Command[A])(args: A): IO[Completion] = ??? + override def pipe[A](command: Command[A]): Pipe[IO, A, Completion] = ??? + override def pipe[A, B](query: Query[A, B], chunkSize: Int): Pipe[IO, A, B] = ??? + override def parameters: concurrent.Signal[IO, Map[String, String]] = ??? + override def parameter(key: String): fs2.Stream[IO, String] = ??? + override def transactionStatus: concurrent.Signal[IO, TransactionStatus] = ??? + override def execute[A](query: Query[skunk.Void, A]): IO[List[A]] = ??? + override def unique[A](query: Query[skunk.Void, A]): IO[A] = ??? + override def unique[A, B](query: Query[A, B])(args: A): IO[B] = ??? + override def option[A](query: Query[skunk.Void, A]): IO[Option[A]] = ??? + override def execute(command: Command[skunk.Void]): IO[Completion] = ??? + override def prepare[A, B](query: Query[A, B]): IO[PreparedQuery[IO, A, B]] = ??? + override def prepare[A](command: Command[A]): IO[PreparedCommand[IO, A]] = ??? + override def channel(name: Identifier): Channel[IO, String, String] = ??? + override def transaction[A]: Resource[IO, Transaction[IO]] = ??? + override def transaction[A]( + isolationLevel: TransactionIsolationLevel, + accessMode: TransactionAccessMode + ): Resource[IO, Transaction[IO]] = ??? + override def typer: Typer = ??? + override def describeCache: Describe.Cache[IO] = ??? + override def parseCache: Parse.Cache[IO] = ??? + } +} diff --git a/graph-commons/src/test/scala/io/renku/lock/PostgresLockStatsSpec.scala b/graph-commons/src/test/scala/io/renku/lock/PostgresLockStatsSpec.scala new file mode 100644 index 0000000000..dd3e390af9 --- /dev/null +++ b/graph-commons/src/test/scala/io/renku/lock/PostgresLockStatsSpec.scala @@ -0,0 +1,99 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats.effect._ +import cats.effect.testing.scalatest.AsyncIOSpec +import cats.syntax.all._ +import io.renku.interpreters.TestLogger +import io.renku.lock.PostgresLockStats.Stats +import org.scalatest.matchers.should +import org.scalatest.wordspec.AsyncWordSpec +import org.typelevel.log4cats.Logger + +class PostgresLockStatsSpec extends AsyncWordSpec with AsyncIOSpec with should.Matchers with PostgresLockTest { + implicit val logger: Logger[IO] = TestLogger[IO]() + + "PostgresLockStats" should { + "obtain empty statistics" in withContainers { cnt => + session(cnt).use { s => + for { + _ <- resetLockTable(s) + stats <- PostgresLockStats.getStats[IO](s) + _ = stats shouldBe Stats(0, Nil) + } yield () + } + } + + "show when a lock is held" in withContainers { cnt => + session(cnt).use { s => + for { + _ <- resetLockTable(s) + (_, release) <- PostgresLock.exclusive_[IO, Int](s).run(1).allocated + stats <- PostgresLockStats.getStats(s) + _ <- release + _ = stats shouldBe Stats(1, Nil) + } yield () + } + } + + "insert waiting info" in withContainers { cnt => + session(cnt).use { s => + for { + _ <- resetLockTable(s) + _ <- PostgresLockStats.recordWaiting(s)(5L) + stats <- PostgresLockStats.getStats(s) + _ = stats.currentLocks shouldBe 0 + _ = stats.waiting.size shouldBe 1 + + _ <- PostgresLockStats.recordWaiting(s)(5L) + stats2 <- PostgresLockStats.getStats(s) + _ = stats shouldBe stats2.copy(waiting = + stats2.waiting.map(_.copy(waitDuration = stats.waiting.head.waitDuration)) + ) + } yield () + } + } + + "waiting info distinguishes sessions" in withContainers { cnt => + (session(cnt), session(cnt)).tupled.use { case (s1, s2) => + for { + _ <- resetLockTable(s1) + _ <- PostgresLockStats.recordWaiting(s1)(5) + _ <- PostgresLockStats.recordWaiting(s2)(5) + stats <- PostgresLockStats.getStats(s1) + _ = stats.waiting.size shouldBe 2 + _ = stats.waiting.map(_.pid).toSet.size shouldBe 2 + } yield () + } + } + + "remove waiting info" in withContainers { cnt => + session(cnt).use { s => + for { + _ <- resetLockTable(s) + _ <- PostgresLockStats.recordWaiting(s)(5L) + _ <- PostgresLockStats.removeWaiting(s)(5L) + stats <- PostgresLockStats.getStats(s) + _ = stats shouldBe Stats(0, Nil) + } yield () + } + } + } +} diff --git a/graph-commons/src/test/scala/io/renku/lock/PostgresLockTest.scala b/graph-commons/src/test/scala/io/renku/lock/PostgresLockTest.scala new file mode 100644 index 0000000000..1d8fa85b05 --- /dev/null +++ b/graph-commons/src/test/scala/io/renku/lock/PostgresLockTest.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.lock + +import cats.effect._ +import com.dimafeng.testcontainers.PostgreSQLContainer +import com.dimafeng.testcontainers.scalatest.TestContainerForAll +import io.renku.db.PostgresContainer +import org.scalatest.Suite +import skunk.Session +import natchez.Trace.Implicits.noop +import org.typelevel.log4cats.Logger +import skunk.implicits._ + +import scala.concurrent.duration._ + +trait PostgresLockTest extends TestContainerForAll { self: Suite => + + override val containerDef = PostgreSQLContainer.Def( + dockerImageName = PostgresContainer.imageName, + databaseName = "locktest", + username = "pg", + password = "pg" + ) + + def session(c: Containers): Resource[IO, Session[IO]] = + Session.single[IO]( + host = c.host, + port = c.underlyingUnsafeContainer.getFirstMappedPort, + user = c.username, + database = c.databaseName, + password = Some(c.password) + ) + + def makeExclusiveLock(s: Session[IO], interval: FiniteDuration = 100.millis)(implicit L: Logger[IO]) = + PostgresLock.exclusive_[IO, String](s, interval) + + def exclusiveLock(cnt: Containers, interval: FiniteDuration = 100.millis)(implicit L: Logger[IO]) = + PostgresLock.exclusive[IO, String](session(cnt), interval) + + def sharedLock(cnt: Containers, interval: FiniteDuration = 100.millis)(implicit L: Logger[IO]) = + PostgresLock.shared[IO, String](session(cnt), interval) + + def resetLockTable(s: Session[IO]) = + PostgresLockStats.ensureStatsTable[IO](s) *> + s.execute(sql"DELETE FROM kg_lock_stats".command).void + +} diff --git a/graph-commons/src/test/scala/io/renku/logging/ExecutionTimeRecorderSpec.scala b/graph-commons/src/test/scala/io/renku/logging/ExecutionTimeRecorderSpec.scala index a373696653..7b52f8a193 100644 --- a/graph-commons/src/test/scala/io/renku/logging/ExecutionTimeRecorderSpec.scala +++ b/graph-commons/src/test/scala/io/renku/logging/ExecutionTimeRecorderSpec.scala @@ -122,6 +122,7 @@ class ExecutionTimeRecorderSpec labelValue shouldBe "label" mock[Histogram.Timer[IO]].pure[IO] } + override def observe(labelValue: String, amt: Double): IO[Unit] = ??? } override val executionTimeRecorder = new ExecutionTimeRecorderImpl(loggingThreshold, Some(histogram)) diff --git a/graph-commons/src/test/scala/io/renku/metrics/HistogramSpec.scala b/graph-commons/src/test/scala/io/renku/metrics/HistogramSpec.scala index 0d1dc0fb8d..3cc9ae87ee 100644 --- a/graph-commons/src/test/scala/io/renku/metrics/HistogramSpec.scala +++ b/graph-commons/src/test/scala/io/renku/metrics/HistogramSpec.scala @@ -19,6 +19,7 @@ package io.renku.metrics import cats.syntax.all._ +import eu.timepit.refined.auto._ import io.renku.generators.Generators.Implicits._ import io.renku.generators.Generators._ import io.renku.metrics.MetricsTools._ @@ -67,6 +68,14 @@ class SingleValueHistogramSpec extends AnyWordSpec with MockFactory with should. } } + "observe" should { + "call the underlying impl" in new TestCase { + val histogram = new LabeledHistogramImpl[Try](name, help, "label", Seq(0.1)) + histogram.observe("label", 101d) + histogram.wrappedCollector.labels("label").get().sum shouldBe 101d + } + } + private trait TestCase { val name = nonBlankStrings().generateOne val help = sentences().generateOne diff --git a/graph-commons/src/test/scala/io/renku/metrics/TestLabeledHistogram.scala b/graph-commons/src/test/scala/io/renku/metrics/TestLabeledHistogram.scala index b35bf0aca5..2c5f6120e7 100644 --- a/graph-commons/src/test/scala/io/renku/metrics/TestLabeledHistogram.scala +++ b/graph-commons/src/test/scala/io/renku/metrics/TestLabeledHistogram.scala @@ -64,6 +64,11 @@ class TestLabeledHistogram(labelName: String) extends LabeledHistogram[IO] with override def startTimer(labelValue: String): IO[Histogram.Timer[IO]] = IO { new LabeledHistogram.NoThresholdTimerImpl[IO](wrappedCollector.labels(labelValue).startTimer()) } + + override def observe(labelValue: String, amt: Double): IO[Unit] = + IO { + wrappedCollector.labels(labelValue).observe(amt) + } } object TestLabeledHistogram { diff --git a/helm-chart/renku-graph/templates/triples-generator-deployment.yaml b/helm-chart/renku-graph/templates/triples-generator-deployment.yaml index 710b09775e..e5ebd11a86 100644 --- a/helm-chart/renku-graph/templates/triples-generator-deployment.yaml +++ b/helm-chart/renku-graph/templates/triples-generator-deployment.yaml @@ -64,6 +64,23 @@ spec: secretKeyRef: name: {{ template "jena.fullname" . }} key: jena-users-renku-password + - name: TRIPLES_GENERATOR_POSTGRES_HOST + value: "{{ template "postgresql.fullname" . }}" + - name: TRIPLES_GENERATOR_POSTGRES_PORT + value: "5432" + - name: TRIPLES_GENERATOR_POSTGRES_USER + value: {{ .Values.global.graph.triplesGenerator.postgresUser }} + - name: TRIPLES_GENERATOR_POSTGRES_PASSWORD + valueFrom: + secretKeyRef: +{{- if .Values.global.graph.triplesGenerator.existingSecret }} + name: {{ tpl .Values.global.graph.triplesGenerator.existingSecret . }} +{{- else }} + name: {{ template "triplesGenerator.fullname" . }}-postgres +{{- end }} + key: graph-triplesGenerator-postgresPassword + - name: TRIPLES_GENERATOR_POSTGRES_CONNECTION_POOL + value: "{{ .Values.triplesGenerator.connectionPool }}" - name: RENKU_DISABLE_VERSION_CHECK value: "true" - name: SENTRY_ENABLED diff --git a/helm-chart/renku-graph/templates/triples-generator-postgres-secret.yaml b/helm-chart/renku-graph/templates/triples-generator-postgres-secret.yaml new file mode 100644 index 0000000000..b654204605 --- /dev/null +++ b/helm-chart/renku-graph/templates/triples-generator-postgres-secret.yaml @@ -0,0 +1,21 @@ +{{- if not .Values.global.graph.triplesGenerator.existingSecret }} +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "triplesGenerator.fullname" . }}-postgres + labels: + app: {{ template "triplesGenerator.name" . }} + chart: {{ template "graph.chart" . }} + release: {{ .Release.Name }} + heritage: {{ .Release.Service }} + annotations: + {{ if or .Values.global.graph.triplesGenerator.postgresPassword.value .Values.global.graph.triplesGenerator.postgresPassword.overwriteOnHelmUpgrade -}} + "helm.sh/hook": "pre-install,pre-upgrade,pre-rollback" + {{- else -}} + "helm.sh/hook": "pre-install,pre-rollback" + {{- end }} + "helm.sh/hook-delete-policy": "before-hook-creation" +type: Opaque +data: + triplesGenerator-postgresPassword: {{ default (randAlphaNum 64) .Values.global.graph.triplesGenerator.postgresPassword.value | b64enc | quote }} +{{- end }} diff --git a/helm-chart/renku-graph/values.yaml b/helm-chart/renku-graph/values.yaml index 7602b1120d..c429af12a2 100644 --- a/helm-chart/renku-graph/values.yaml +++ b/helm-chart/renku-graph/values.yaml @@ -20,11 +20,16 @@ global: # Password for the `eventlog` user # Generate one using: `openssl rand -hex 16` value: - tokenRepository: - postgresPassword: - # Password for the `tokenstorage` user - # Generate one using: `openssl rand -hex 16` - value: + tokenRepository: + postgresPassword: + # Password for the `tokenstorage` user + # Generate one using: `openssl rand -hex 16` + value: + triplesGenerator: + postgresPassword: + # Password for the `triplesgenerator` user + # Generate one using: `openssl rand -hex 16` + value: ## to enable debug mode debug: false @@ -130,6 +135,7 @@ triplesGenerator: ## a demanded number of concurrent triples generation processes generationProcessesNumber: 2 transformationProcessesNumber: 2 + connectionPool: 10 # set this to a pip-installable renku-python version which will be installed on startup renkuPythonDevVersion: diff --git a/triples-generator/src/main/resources/application.conf b/triples-generator/src/main/resources/application.conf index 0c56cdbf6e..9c040dec37 100644 --- a/triples-generator/src/main/resources/application.conf +++ b/triples-generator/src/main/resources/application.conf @@ -31,6 +31,22 @@ compatibility { renku-python-dev-version = ${?RENKU_PYTHON_DEV_VERSION} +triples-generator-db { + db-host = "localhost" + db-host = ${?TRIPLES_GENERATOR_POSTGRES_HOST} + db-port = 5432 + db-port = ${?TRIPLES_GENERATOR_POSTGRES_PORT} + db-user = "triplesgenerator" + db-user = ${?TRIPLES_GENERATOR_POSTGRES_USER} + db-pass = "triplesgeneratorpass" + db-pass = ${?TRIPLES_GENERATOR_POSTGRES_PASSWORD} + db-url-template = "jdbc:postgresql://$host/$dbName" + connection-pool = 8 + connection-pool = ${?TRIPLES_GENERATOR_POSTGRES_CONNECTION_POOL} +} + +metrics-interval = "30 seconds" + services { triples-generator { diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala index 02b57ba513..ecf1b7e849 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala @@ -25,9 +25,11 @@ import com.typesafe.config.{Config, ConfigFactory} import fs2.concurrent.{Signal, SignallingRef} import io.renku.config.certificates.CertificateLoader import io.renku.config.sentry.SentryInitializer +import io.renku.db.{SessionPoolResource, SessionResource} import io.renku.entities.viewings import io.renku.events.consumers import io.renku.events.consumers.EventConsumersRegistry +import io.renku.graph.model.projects import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.http.server.HttpServer @@ -39,10 +41,13 @@ import io.renku.triplesgenerator.events.consumers._ import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesgenerator.events.consumers.tsprovisioning.{minprojectinfo, triplesgenerated} import io.renku.triplesgenerator.init.{CliVersionCompatibilityChecker, CliVersionCompatibilityVerifier} +import io.renku.triplesgenerator.metrics.MetricsService import io.renku.triplesstore.{ProjectsConnectionConfig, SparqlQueryTimeRecorder} +import natchez.Trace.Implicits.noop import org.http4s.server.Server import org.typelevel.log4cats.Logger +import scala.concurrent.duration._ import scala.util.control.NonFatal object Microservice extends IOMicroservice { @@ -57,25 +62,44 @@ object Microservice extends IOMicroservice { } } - override def run(args: List[String]): IO[ExitCode] = for { + override def run(args: List[String]): IO[ExitCode] = { + val resources = for { + config <- Resource.eval(parseConfigArgs(args)) + dbSessionPool <- Resource + .eval(new TgLockDbConfigProvider[IO].map(SessionPoolResource[IO, TgLockDB])) + .flatMap(identity) + } yield (config, dbSessionPool) + + resources.use { case (config, dbSessionPool) => + doRun(config, dbSessionPool) + } + } + + private def doRun(config: Config, dbSessionPool: SessionResource[IO, TgLockDB]): IO[ExitCode] = for { implicit0(mr: MetricsRegistry[IO]) <- MetricsRegistry[IO]() implicit0(sqtr: SparqlQueryTimeRecorder[IO]) <- SparqlQueryTimeRecorder[IO]() implicit0(gc: GitLabClient[IO]) <- GitLabClient[IO]() implicit0(acf: AccessTokenFinder[IO]) <- AccessTokenFinder[IO]() implicit0(rp: ReProvisioningStatus[IO]) <- ReProvisioningStatus[IO]() - config <- parseConfigArgs(args) - projectConnConfig <- ProjectsConnectionConfig[IO](config) - certificateLoader <- CertificateLoader[IO] - gitCertificateInstaller <- GitCertificateInstaller[IO] - sentryInitializer <- SentryInitializer[IO] - cliVersionCompatChecker <- CliVersionCompatibilityChecker[IO](config) - awaitingGenerationSubscription <- awaitinggeneration.SubscriptionFactory[IO] - membersSyncSubscription <- membersync.SubscriptionFactory[IO] - triplesGeneratedSubscription <- triplesgenerated.SubscriptionFactory[IO] - cleanUpSubscription <- cleanup.SubscriptionFactory[IO] - minProjectInfoSubscription <- minprojectinfo.SubscriptionFactory[IO] - migrationRequestSubscription <- tsmigrationrequest.SubscriptionFactory[IO](config) - syncRepoMetadataSubscription <- syncrepometadata.SubscriptionFactory[IO](config) + + _ <- TgLockDB.migrate[IO](dbSessionPool, 20.seconds) + + metricsService <- MetricsService[IO](dbSessionPool) + _ <- metricsService.collectEvery(Duration.fromNanos(config.getDuration("metrics-interval").toNanos)).start + + tsWriteLock = TgLockDB.createLock[IO, projects.Path](dbSessionPool, 0.5.seconds) + projectConnConfig <- ProjectsConnectionConfig[IO](config) + certificateLoader <- CertificateLoader[IO] + gitCertificateInstaller <- GitCertificateInstaller[IO] + sentryInitializer <- SentryInitializer[IO] + cliVersionCompatChecker <- CliVersionCompatibilityChecker[IO](config) + awaitingGenerationSubscription <- awaitinggeneration.SubscriptionFactory[IO] + membersSyncSubscription <- membersync.SubscriptionFactory[IO](tsWriteLock) + triplesGeneratedSubscription <- triplesgenerated.SubscriptionFactory[IO](tsWriteLock) + cleanUpSubscription <- cleanup.SubscriptionFactory[IO](tsWriteLock) + minProjectInfoSubscription <- minprojectinfo.SubscriptionFactory[IO](tsWriteLock) + migrationRequestSubscription <- tsmigrationrequest.SubscriptionFactory[IO](config) + syncRepoMetadataSubscription <- syncrepometadata.SubscriptionFactory[IO](config, tsWriteLock) projectActivationsSubscription <- viewings.collector.projects.activated.SubscriptionFactory[IO](projectConnConfig) projectViewingsSubscription <- viewings.collector.projects.viewed.SubscriptionFactory[IO](projectConnConfig) datasetViewingsSubscription <- viewings.collector.datasets.SubscriptionFactory[IO](projectConnConfig) @@ -131,6 +155,7 @@ private class MicroserviceRunner[F[_]: Async: Logger]( _ <- Resource.eval(cliVersionCompatibilityVerifier.run) _ <- Spawn[F].background(serviceReadinessChecker.waitIfNotUp >> eventConsumersRegistry.run) server <- httpServer.createServer + _ <- Resource.eval(Logger[F].info(s"Triples-Generator service started")) } yield server } recoverWith logAndThrow diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/TgLockDB.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/TgLockDB.scala new file mode 100644 index 0000000000..4fda9d1202 --- /dev/null +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/TgLockDB.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator + +import cats._ +import cats.effect.{MonadCancelThrow, Temporal} +import cats.syntax.all._ +import eu.timepit.refined.auto._ +import fs2.Stream +import io.renku.db.DBConfigProvider +import io.renku.graph.model.projects +import io.renku.lock.{Lock, LongKey, PostgresLock, PostgresLockStats} +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration.FiniteDuration + +sealed trait TgLockDB + +object TgLockDB { + type TsWriteLock[F[_]] = Lock[F, projects.Path] + + type SessionResource[F[_]] = io.renku.db.SessionResource[F, TgLockDB] + + object SessionResource { + def apply[F[_]](implicit sr: SessionResource[F]): SessionResource[F] = sr + } + + def createLock[F[_]: Logger: Temporal, A: LongKey]( + sessionPool: SessionResource[F], + interval: FiniteDuration + ): Lock[F, A] = + PostgresLock.exclusive[F, A](sessionPool.session, interval) + + def migrate[F[_]: MonadCancelThrow: Temporal: Logger](dbPool: SessionResource[F], retry: FiniteDuration): F[Unit] = { + val run = dbPool.session.use(PostgresLockStats.ensureStatsTable[F]).attempt + (Stream.eval(run) ++ Stream.awakeDelay(retry).evalMap(_ => run)) + .evalMap { + case Right(_) => Logger[F].info(s"triples_generator db migration done").as(0) + case Left(ex) => Logger[F].error(ex)(s"Error running triples_generator migrations").as(1) + } + .find(_ == 0) + .compile + .drain + } +} + +class TgLockDbConfigProvider[F[_]: MonadThrow]() + extends DBConfigProvider[F, TgLockDB]( + namespace = "triples-generator-db", + dbName = "triples_generator" + ) diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/TSReadinessForEventsChecker.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/TSReadinessForEventsChecker.scala index f47eeba1aa..0243c72158 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/TSReadinessForEventsChecker.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/TSReadinessForEventsChecker.scala @@ -28,7 +28,7 @@ import TSStateChecker.TSState.{MissingDatasets, ReProvisioning, Ready} import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger -private trait TSReadinessForEventsChecker[F[_]] { +private[consumers] trait TSReadinessForEventsChecker[F[_]] { def verifyTSReady: F[Option[EventSchedulingResult]] } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/SubscriptionFactory.scala index 27a1b23278..faea3f548a 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/awaitinggeneration/SubscriptionFactory.scala @@ -20,15 +20,15 @@ package io.renku.triplesgenerator.events.consumers.awaitinggeneration import cats.effect.Async import cats.syntax.all._ -import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.Subscription.SubscriberCapacity import io.renku.events.consumers +import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.subscriptions.SubscriptionPayloadComposer.defaultSubscriptionPayloadComposerFactory import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.metrics.MetricsRegistry -import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesgenerator.Microservice +import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandler.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandler.scala index 79cbae12eb..3d8dd47e55 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandler.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandler.scala @@ -25,6 +25,8 @@ import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.{CategoryName, consumers} import io.renku.metrics.MetricsRegistry +import io.renku.lock.syntax._ +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.api.events.CleanUpEvent import io.renku.triplesgenerator.events.consumers.TSReadinessForEventsChecker import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus @@ -36,15 +38,16 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( tsReadinessChecker: TSReadinessForEventsChecker[F], eventProcessor: EventProcessor[F], subscriptionMechanism: SubscriptionMechanism[F], - processExecutor: ProcessExecutor[F] + processExecutor: ProcessExecutor[F], + tsWriteLock: TsWriteLock[F] ) extends consumers.EventHandlerWithProcessLimiter[F](processExecutor) { protected override type Event = CleanUpEvent override def createHandlingDefinition(): EventHandlingDefinition = EventHandlingDefinition( - _.event.as[CleanUpEvent], - e => eventProcessor.process(e.project), + decode = _.event.as[CleanUpEvent], + process = tsWriteLock.contramap[Event](_.project.path).surround(e => eventProcessor.process(e.project)), precondition = tsReadinessChecker.verifyTSReady, onRelease = subscriptionMechanism.renewSubscription().some ) @@ -53,10 +56,18 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( private object EventHandler { def apply[F[_]: Async: ReProvisioningStatus: Logger: MetricsRegistry: SparqlQueryTimeRecorder]( - subscriptionMechanism: SubscriptionMechanism[F] + subscriptionMechanism: SubscriptionMechanism[F], + tsWriteLock: TsWriteLock[F] ): F[consumers.EventHandler[F]] = for { tsReadinessChecker <- TSReadinessForEventsChecker[F] eventProcessor <- EventProcessor[F] processExecutor <- ProcessExecutor.concurrent(processesCount = 1) - } yield new EventHandler[F](categoryName, tsReadinessChecker, eventProcessor, subscriptionMechanism, processExecutor) + } yield new EventHandler[F]( + categoryName, + tsReadinessChecker, + eventProcessor, + subscriptionMechanism, + processExecutor, + tsWriteLock + ) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/SubscriptionFactory.scala index 780129a548..69e295abe1 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/cleanup/SubscriptionFactory.scala @@ -25,19 +25,21 @@ import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.subscriptions.SubscriptionPayloadComposer.defaultSubscriptionPayloadComposerFactory import io.renku.metrics.MetricsRegistry import io.renku.triplesgenerator.Microservice +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger object SubscriptionFactory { - def apply[F[_]: Async: ReProvisioningStatus: Logger: MetricsRegistry: SparqlQueryTimeRecorder] - : F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { + def apply[F[_]: Async: ReProvisioningStatus: Logger: MetricsRegistry: SparqlQueryTimeRecorder]( + tsWriteLock: TsWriteLock[F] + ): F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { subscriptionMechanism <- SubscriptionMechanism( categoryName, defaultSubscriptionPayloadComposerFactory(Microservice.ServicePort, Microservice.Identifier) ) _ <- ReProvisioningStatus[F].registerForNotification(subscriptionMechanism) - handler <- EventHandler(subscriptionMechanism) + handler <- EventHandler(subscriptionMechanism, tsWriteLock) } yield handler -> subscriptionMechanism } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala index 535641b716..ce0079c834 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala @@ -21,12 +21,15 @@ package membersync import cats.effect.{Async, MonadCancelThrow} import cats.syntax.all._ -import io.renku.events.{consumers, CategoryName} +import io.renku.events.{CategoryName, consumers} import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.graph.model.projects import io.renku.graph.tokenrepository.AccessTokenFinder +import io.renku.lock.syntax._ import io.renku.http.client.GitLabClient +import io.renku.lock.Lock +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger import tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus @@ -36,7 +39,8 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( tsReadinessChecker: TSReadinessForEventsChecker[F], membersSynchronizer: MembersSynchronizer[F], subscriptionMechanism: SubscriptionMechanism[F], - processExecutor: ProcessExecutor[F] + processExecutor: ProcessExecutor[F], + tsWriteLock: TsWriteLock[F] ) extends consumers.EventHandlerWithProcessLimiter[F](processExecutor) { import io.renku.events.consumers.EventDecodingTools._ @@ -48,7 +52,7 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( override def createHandlingDefinition(): EventHandlingDefinition = EventHandlingDefinition( decode = _.event.getProjectPath, - process = synchronizeMembers, + process = (tsWriteLock: Lock[F, projects.Path]).surround[Unit](synchronizeMembers _), precondition = verifyTSReady, onRelease = subscriptionMechanism.renewSubscription().some ) @@ -59,15 +63,18 @@ private object EventHandler { import eu.timepit.refined.auto._ def apply[F[_]: Async: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder: Logger]( - subscriptionMechanism: SubscriptionMechanism[F] + subscriptionMechanism: SubscriptionMechanism[F], + tsWriteLock: TsWriteLock[F] ): F[consumers.EventHandler[F]] = for { tsReadinessChecker <- TSReadinessForEventsChecker[F] membersSynchronizer <- MembersSynchronizer[F] processExecutor <- ProcessExecutor.concurrent(processesCount = 1) - } yield new EventHandler[F](categoryName, - tsReadinessChecker, - membersSynchronizer, - subscriptionMechanism, - processExecutor + } yield new EventHandler[F]( + categoryName, + tsReadinessChecker, + membersSynchronizer, + subscriptionMechanism, + processExecutor, + tsWriteLock ) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala index 54cf1a56fc..5f8d6238b8 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala @@ -26,20 +26,22 @@ import io.renku.events.consumers.subscriptions.SubscriptionPayloadComposer.defau import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.triplesgenerator.Microservice +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger object SubscriptionFactory { - def apply[F[_]: Async: Logger: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder] - : F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { + def apply[F[_]: Async: Logger: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder]( + tsWriteLock: TsWriteLock[F] + ): F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { subscriptionMechanism <- SubscriptionMechanism( categoryName, defaultSubscriptionPayloadComposerFactory(Microservice.ServicePort, Microservice.Identifier) ) _ <- ReProvisioningStatus[F].registerForNotification(subscriptionMechanism) - handler <- EventHandler[F](subscriptionMechanism) + handler <- EventHandler[F](subscriptionMechanism, tsWriteLock) } yield handler -> subscriptionMechanism } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandler.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandler.scala index 05777137fa..2932f9fbac 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandler.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandler.scala @@ -25,9 +25,11 @@ import com.typesafe.config.Config import eu.timepit.refined.auto._ import io.renku.events.consumers.ProcessExecutor import io.renku.events.{CategoryName, consumers} +import io.renku.lock.syntax._ import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.metrics.MetricsRegistry +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.api.events.SyncRepoMetadata import io.renku.triplesgenerator.events.consumers.TSReadinessForEventsChecker import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus @@ -35,12 +37,13 @@ import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger import processor.EventProcessor -private class EventHandler[F[_]: MonadCancelThrow: Logger]( +private[syncrepometadata] class EventHandler[F[_]: MonadCancelThrow: Logger]( override val categoryName: CategoryName, tsReadinessChecker: TSReadinessForEventsChecker[F], eventDecoder: EventDecoder, eventProcessor: EventProcessor[F], - processExecutor: ProcessExecutor[F] + processExecutor: ProcessExecutor[F], + tsWriteLock: TsWriteLock[F] ) extends consumers.EventHandlerWithProcessLimiter[F](processExecutor) { protected override type Event = SyncRepoMetadata @@ -48,7 +51,7 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( override def createHandlingDefinition(): EventHandlingDefinition = EventHandlingDefinition( eventDecoder.decode, - eventProcessor.process, + tsWriteLock.contramap[Event](_.path).surround(eventProcessor.process), precondition = tsReadinessChecker.verifyTSReady ) } @@ -57,10 +60,18 @@ private object EventHandler { def apply[F[ _ ]: Async: NonEmptyParallel: GitLabClient: AccessTokenFinder: Logger: ReProvisioningStatus: SparqlQueryTimeRecorder: MetricsRegistry]( - config: Config + config: Config, + tsWriteLock: TsWriteLock[F] ): F[consumers.EventHandler[F]] = for { tsReadinessChecker <- TSReadinessForEventsChecker[F] eventProcessor <- EventProcessor[F](config) processExecutor <- ProcessExecutor.concurrent(1) - } yield new EventHandler[F](categoryName, tsReadinessChecker, EventDecoder, eventProcessor, processExecutor) + } yield new EventHandler[F]( + categoryName, + tsReadinessChecker, + EventDecoder, + eventProcessor, + processExecutor, + tsWriteLock + ) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/SubscriptionFactory.scala index 55fd1de5a5..d5a5f1627e 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/SubscriptionFactory.scala @@ -27,6 +27,7 @@ import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.metrics.MetricsRegistry +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger @@ -36,7 +37,8 @@ object SubscriptionFactory { def apply[F[ _ ]: Async: NonEmptyParallel: GitLabClient: AccessTokenFinder: Logger: ReProvisioningStatus: SparqlQueryTimeRecorder: MetricsRegistry]( - config: Config + config: Config, + tsWriteLock: TsWriteLock[F] ): F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = - EventHandler[F](config).map(_ -> SubscriptionMechanism.noOpSubscriptionMechanism(categoryName)) + EventHandler[F](config, tsWriteLock).map(_ -> SubscriptionMechanism.noOpSubscriptionMechanism(categoryName)) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandler.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandler.scala index 2a8d5e479a..3c982ee966 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandler.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandler.scala @@ -27,7 +27,9 @@ import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.ProcessExecutor import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient +import io.renku.lock.syntax._ import io.renku.metrics.MetricsRegistry +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger @@ -37,7 +39,8 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( tsReadinessChecker: TSReadinessForEventsChecker[F], subscriptionMechanism: SubscriptionMechanism[F], eventProcessor: EventProcessor[F], - processExecutor: ProcessExecutor[F] + processExecutor: ProcessExecutor[F], + tsWriteLock: TsWriteLock[F] ) extends consumers.EventHandlerWithProcessLimiter[F](processExecutor) { import io.renku.events.consumers.EventDecodingTools._ @@ -47,7 +50,7 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( override def createHandlingDefinition(): EventHandlingDefinition = EventHandlingDefinition( _.event.getProject.map(MinProjectInfoEvent(_)), - eventProcessor.process, + tsWriteLock.contramap[Event](_.project.path).surround(eventProcessor.process), precondition = tsReadinessChecker.verifyTSReady, onRelease = subscriptionMechanism.renewSubscription().some ) @@ -64,11 +67,19 @@ private object EventHandler { _ ]: Async: NonEmptyParallel: Parallel: ReProvisioningStatus: GitLabClient: AccessTokenFinder: MetricsRegistry: Logger: SparqlQueryTimeRecorder]( subscriptionMechanism: SubscriptionMechanism[F], + tsWriteLock: TsWriteLock[F], config: Config = ConfigFactory.load() ): F[consumers.EventHandler[F]] = for { tsReadinessChecker <- TSReadinessForEventsChecker[F] eventProcessor <- EventProcessor[F] processesCount <- find[F, Int Refined Positive]("add-min-project-info-max-concurrent-processes", config) processExecutor <- ProcessExecutor.concurrent(processesCount) - } yield new EventHandler[F](categoryName, tsReadinessChecker, subscriptionMechanism, eventProcessor, processExecutor) + } yield new EventHandler[F]( + categoryName, + tsReadinessChecker, + subscriptionMechanism, + eventProcessor, + processExecutor, + tsWriteLock + ) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/SubscriptionFactory.scala index 531e7dcf50..2d22a22598 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/SubscriptionFactory.scala @@ -28,6 +28,7 @@ import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.metrics.MetricsRegistry import io.renku.triplesgenerator.Microservice +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger @@ -36,14 +37,15 @@ object SubscriptionFactory { def apply[F[ _ - ]: Async: NonEmptyParallel: Parallel: ReProvisioningStatus: GitLabClient: AccessTokenFinder: MetricsRegistry: Logger: SparqlQueryTimeRecorder] - : F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { + ]: Async: NonEmptyParallel: Parallel: ReProvisioningStatus: GitLabClient: AccessTokenFinder: MetricsRegistry: Logger: SparqlQueryTimeRecorder]( + tsWriteLock: TsWriteLock[F] + ): F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { subscriptionMechanism <- SubscriptionMechanism( categoryName, defaultSubscriptionPayloadComposerFactory(Microservice.ServicePort, Microservice.Identifier) ) _ <- ReProvisioningStatus[F].registerForNotification(subscriptionMechanism) - handler <- EventHandler[F](subscriptionMechanism) + handler <- EventHandler[F](subscriptionMechanism, tsWriteLock) } yield handler -> subscriptionMechanism } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandler.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandler.scala index 0755962119..e4d0bda890 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandler.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandler.scala @@ -19,18 +19,20 @@ package io.renku.triplesgenerator.events.consumers package tsprovisioning.triplesgenerated -import cats.{NonEmptyParallel, Parallel} import cats.effect._ import cats.syntax.all._ -import io.renku.events.{CategoryName, consumers} +import cats.{NonEmptyParallel, Parallel} import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism +import io.renku.events.{CategoryName, consumers} import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient +import io.renku.lock.syntax._ import io.renku.metrics.MetricsRegistry +import io.renku.triplesgenerator.TgLockDB.TsWriteLock +import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger -import tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus private class EventHandler[F[_]: MonadCancelThrow: Logger]( override val categoryName: CategoryName, @@ -38,7 +40,8 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( eventDecoder: EventDecoder, subscriptionMechanism: SubscriptionMechanism[F], eventProcessor: EventProcessor[F], - processExecutor: ProcessExecutor[F] + processExecutor: ProcessExecutor[F], + tsWriteLock: TsWriteLock[F] ) extends consumers.EventHandlerWithProcessLimiter[F](processExecutor) { protected override type Event = TriplesGeneratedEvent @@ -46,7 +49,7 @@ private class EventHandler[F[_]: MonadCancelThrow: Logger]( override def createHandlingDefinition(): EventHandlingDefinition = EventHandlingDefinition( eventDecoder.decode, - eventProcessor.process, + tsWriteLock.contramap[Event](_.project.path).surround(eventProcessor.process _), precondition = tsReadinessChecker.verifyTSReady, onRelease = subscriptionMechanism.renewSubscription().some ) @@ -58,16 +61,19 @@ private object EventHandler { _ ]: Async: NonEmptyParallel: Parallel: ReProvisioningStatus: GitLabClient: AccessTokenFinder: Logger: MetricsRegistry: SparqlQueryTimeRecorder]( subscriptionMechanism: SubscriptionMechanism[F], - concurrentProcessesNumber: ConcurrentProcessesNumber + concurrentProcessesNumber: ConcurrentProcessesNumber, + tsWriteLock: TsWriteLock[F] ): F[consumers.EventHandler[F]] = for { tsReadinessChecker <- TSReadinessForEventsChecker[F] eventProcessor <- EventProcessor[F] processExecutor <- ProcessExecutor.concurrent(concurrentProcessesNumber.asRefined) - } yield new EventHandler[F](categoryName, - tsReadinessChecker, - EventDecoder(), - subscriptionMechanism, - eventProcessor, - processExecutor + } yield new EventHandler[F]( + categoryName, + tsReadinessChecker, + EventDecoder(), + subscriptionMechanism, + eventProcessor, + processExecutor, + tsWriteLock ) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/SubscriptionFactory.scala index 3b50c97777..b284a88616 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/SubscriptionFactory.scala @@ -18,17 +18,18 @@ package io.renku.triplesgenerator.events.consumers.tsprovisioning.triplesgenerated -import cats.{NonEmptyParallel, Parallel} import cats.effect.Async import cats.syntax.all._ -import io.renku.events.consumers.subscriptions.SubscriptionMechanism +import cats.{NonEmptyParallel, Parallel} import io.renku.events.Subscription.SubscriberCapacity import io.renku.events.consumers +import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.subscriptions.SubscriptionPayloadComposer.defaultSubscriptionPayloadComposerFactory import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.metrics.MetricsRegistry import io.renku.triplesgenerator.Microservice +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger @@ -36,8 +37,9 @@ import org.typelevel.log4cats.Logger object SubscriptionFactory { def apply[F[ _ - ]: Async: NonEmptyParallel: Parallel: ReProvisioningStatus: GitLabClient: AccessTokenFinder: Logger: MetricsRegistry: SparqlQueryTimeRecorder] - : F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { + ]: Async: NonEmptyParallel: Parallel: ReProvisioningStatus: GitLabClient: AccessTokenFinder: Logger: MetricsRegistry: SparqlQueryTimeRecorder]( + tsWriteLock: TsWriteLock[F] + ): F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { concurrentProcessesNumber <- ConcurrentProcessesNumber[F]() subscriptionMechanism <- SubscriptionMechanism( @@ -48,6 +50,6 @@ object SubscriptionFactory { ) ) _ <- ReProvisioningStatus[F].registerForNotification(subscriptionMechanism) - handler <- EventHandler(subscriptionMechanism, concurrentProcessesNumber) + handler <- EventHandler(subscriptionMechanism, concurrentProcessesNumber, tsWriteLock) } yield handler -> subscriptionMechanism } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/MetricsService.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/MetricsService.scala new file mode 100644 index 0000000000..c3d194a951 --- /dev/null +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/MetricsService.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator.metrics + +import cats.Traverse +import cats.data.Kleisli +import cats.effect._ +import cats.syntax.all._ +import fs2.{Compiler, Stream} +import eu.timepit.refined.auto._ +import io.renku.lock.PostgresLockStats +import io.renku.metrics.MetricsRegistry +import io.renku.triplesgenerator.TgLockDB + +import scala.concurrent.duration.FiniteDuration + +trait MetricsService[F[_]] { + + def collect: F[Unit] + + def collectEvery(interval: FiniteDuration)(implicit C: Compiler[F, F], T: Temporal[F]) = + Stream.awakeEvery(interval).evalMap(_ => collect).compile.drain +} + +object MetricsService { + + def apply[F[_]: Async: MetricsRegistry](dbPool: TgLockDB.SessionResource[F]): F[MetricsService[F]] = { + val pgGauge = PostgresLockGauge[F]("triples_generator") + val pgHg = PostgresLockHistogram[F] + val getStats = dbPool.useK(Kleisli(PostgresLockStats.getStats[F])) + + (pgGauge, pgHg).mapN { (gauge, hg) => + new MetricsService[F] { + override def collect: F[Unit] = + getStats.flatMap { stats => + Traverse[List].sequence_( + gauge.set(PostgresLockGauge.Label.CurrentLocks -> stats.currentLocks.toDouble) :: + gauge.set(PostgresLockGauge.Label.Waiting -> stats.waiting.size.toDouble) :: + stats.waiting.map(w => hg.observe(w.objectId.toString, w.waitDuration.toMillis.toDouble)) + ) + } + } + } + } +} diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/PostgresLockGauge.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/PostgresLockGauge.scala new file mode 100644 index 0000000000..b12bc24435 --- /dev/null +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/PostgresLockGauge.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator.metrics + +import cats.effect._ +import cats.syntax.all._ +import eu.timepit.refined.api.Refined +import eu.timepit.refined.collection.NonEmpty +import eu.timepit.refined.auto._ +import io.renku.metrics.{LabeledGauge, MetricsRegistry, PositiveValuesLabeledGauge} + +trait PostgresLockGauge[F[_]] extends LabeledGauge[F, PostgresLockGauge.Label] + +object PostgresLockGauge { + + sealed trait Label + object Label { + case object CurrentLocks extends Label { + override def toString = "current_lock_count" + } + case object Waiting extends Label { + override def toString = "waiting_count" + } + } + + def apply[F[_]: Async: MetricsRegistry](dbId: String Refined NonEmpty): F[PostgresLockGauge[F]] = + MetricsRegistry[F] + .register( + new PositiveValuesLabeledGauge[F, Label]( + name = dbId, + help = s"Statistics for postgres locks", + labelName = "postgres_lock_stats", + resetDataFetch = () => Async[F].pure(Map.empty) + ) with PostgresLockGauge[F] + ) + .widen + +} diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/PostgresLockHistogram.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/PostgresLockHistogram.scala new file mode 100644 index 0000000000..28af8892b9 --- /dev/null +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/metrics/PostgresLockHistogram.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator.metrics + +import cats.MonadThrow +import cats.syntax.all._ +import eu.timepit.refined.auto._ +import io.renku.metrics.{LabeledHistogram, LabeledHistogramImpl, MetricsRegistry} + +import scala.concurrent.duration._ + +trait PostgresLockHistogram[F[_]] extends LabeledHistogram[F] + +object PostgresLockHistogram { + def apply[F[_]: MonadThrow: MetricsRegistry]: F[PostgresLockHistogram[F]] = MetricsRegistry[F].register { + new LabeledHistogramImpl[F]( + name = "postgres_lock_wait_times", + help = "PostgresLock wait times", + labelName = "object_id", + buckets = Seq(.05, .1, .5, 1, 2.5, 5, 10, 50), + maybeThreshold = 200.millis.some + ) with PostgresLockHistogram[F] + }.widen +} diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandlerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandlerSpec.scala index 50c409ab42..41d245e154 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandlerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/cleanup/EventHandlerSpec.scala @@ -18,6 +18,7 @@ package io.renku.triplesgenerator.events.consumers.cleanup +import cats.data.Kleisli import cats.effect.{IO, Ref} import cats.syntax.all._ import io.circe.syntax._ @@ -26,8 +27,11 @@ import io.renku.events.consumers.ConsumersModelGenerators.{consumerProjects, eve import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.generators.Generators.Implicits._ +import io.renku.graph.model.projects import io.renku.interpreters.TestLogger +import io.renku.lock.Lock import io.renku.testtools.IOSpec +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.api.events.CleanUpEvent import io.renku.triplesgenerator.events.consumers.TSReadinessForEventsChecker import org.scalamock.scalatest.MockFactory @@ -60,6 +64,19 @@ class EventHandlerSpec extends AnyWordSpec with MockFactory with IOSpec with sho handler.createHandlingDefinition().process(event).unsafeRunSync() shouldBe () } + + "lock while executing" in new TestCase { + val test = Ref.unsafe[IO, Int](0) + override val tsWriteLock: TsWriteLock[IO] = + Lock.from[IO, projects.Path](Kleisli(_ => test.update(_ + 1)))(Kleisli(_ => test.update(_ + 1))) + + val event = consumerProjects.map(CleanUpEvent(_)).generateOne + + (eventProcessor.process _).expects(event.project).returns(().pure[IO]) + + handler.createHandlingDefinition().process(event).unsafeRunSync() shouldBe () + test.get.unsafeRunSync() shouldBe 2 + } } "handlingDefinition.precondition" should { @@ -93,11 +110,14 @@ class EventHandlerSpec extends AnyWordSpec with MockFactory with IOSpec with sho val renewSubscriptionCalled = Ref.unsafe[IO, Boolean](false) (subscriptionMechanism.renewSubscription _).expects().returns(renewSubscriptionCalled.set(true)) - val handler = new EventHandler[IO](categoryName, - tsReadinessChecker, - eventProcessor, - subscriptionMechanism, - mock[ProcessExecutor[IO]] + def tsWriteLock: TsWriteLock[IO] = Lock.none[IO, projects.Path] + lazy val handler = new EventHandler[IO]( + categoryName, + tsReadinessChecker, + eventProcessor, + subscriptionMechanism, + mock[ProcessExecutor[IO]], + tsWriteLock ) } } diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandlerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandlerSpec.scala index 5b3c35773b..0db389a238 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandlerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandlerSpec.scala @@ -18,6 +18,7 @@ package io.renku.triplesgenerator.events.consumers.membersync +import cats.data.Kleisli import cats.effect.{IO, Ref} import cats.syntax.all._ import io.circe.literal._ @@ -32,7 +33,9 @@ import io.renku.generators.Generators.jsons import io.renku.graph.model.GraphModelGenerators.projectPaths import io.renku.graph.model.projects import io.renku.interpreters.TestLogger +import io.renku.lock.Lock import io.renku.testtools.IOSpec +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.TSReadinessForEventsChecker import org.scalamock.scalatest.MockFactory import org.scalatest.EitherValues @@ -67,6 +70,17 @@ class EventHandlerSpec extends AnyWordSpec with IOSpec with MockFactory with sho handler.createHandlingDefinition().process(eventProject).unsafeRunSync() shouldBe () } + + "lock while executing" in new TestCase { + val test = Ref.unsafe[IO, Int](0) + override val tsWriteLock: TsWriteLock[IO] = + Lock.from[IO, projects.Path](Kleisli(_ => test.update(_ + 1)))(Kleisli(_ => test.update(_ + 1))) + + (membersSynchronizer.synchronizeMembers _).expects(eventProject).returns(().pure[IO]) + + handler.createHandlingDefinition().process(eventProject).unsafeRunSync() shouldBe () + test.get.unsafeRunSync() shouldBe 2 + } } "handlingDefinition.precondition" should { @@ -102,11 +116,14 @@ class EventHandlerSpec extends AnyWordSpec with IOSpec with MockFactory with sho val renewSubscriptionCalled = Ref.unsafe[IO, Boolean](false) (subscriptionMechanism.renewSubscription _).expects().returns(renewSubscriptionCalled.set(true)) - val handler = new EventHandler[IO](categoryName, - tsReadinessChecker, - membersSynchronizer, - subscriptionMechanism, - mock[ProcessExecutor[IO]] + def tsWriteLock: TsWriteLock[IO] = Lock.none[IO, projects.Path] + lazy val handler = new EventHandler[IO]( + categoryName, + tsReadinessChecker, + membersSynchronizer, + subscriptionMechanism, + mock[ProcessExecutor[IO]], + tsWriteLock ) } diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandlerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandlerSpec.scala index a4ac25184c..08d8fea59d 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandlerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/syncrepometadata/EventHandlerSpec.scala @@ -18,13 +18,17 @@ package io.renku.triplesgenerator.events.consumers.syncrepometadata -import cats.effect.IO +import cats.data.Kleisli +import cats.effect._ import cats.effect.testing.scalatest.AsyncIOSpec import cats.syntax.all._ import io.renku.events.consumers.ConsumersModelGenerators.eventSchedulingResults import io.renku.events.consumers.ProcessExecutor import io.renku.generators.Generators.Implicits._ +import io.renku.graph.model.projects import io.renku.interpreters.TestLogger +import io.renku.lock.Lock +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.api.events.Generators.syncRepoMetadataEvents import io.renku.triplesgenerator.events.consumers.TSReadinessForEventsChecker import io.renku.triplesgenerator.events.consumers.syncrepometadata.processor.EventProcessor @@ -37,23 +41,43 @@ class EventHandlerSpec extends AsyncWordSpec with AsyncIOSpec with should.Matche "handlingDefinition.decode" should { "be EventDecoder.decode" in { + val tc = new TestCase - expectTSReadinessCheckerCall + tc.expectTSReadinessCheckerCall - handler.createHandlingDefinition().decode shouldBe EventDecoder.decode + tc.handler.createHandlingDefinition().decode shouldBe EventDecoder.decode } } "handlingDefinition.process" should { "be the EventProcessor.process" in { + val tc = new TestCase + tc.expectTSReadinessCheckerCall - expectTSReadinessCheckerCall + syncRepoMetadataEvents[IO].map(_.generateOne).flatMap { event => + (tc.eventProcessor.process _).expects(event).returns(().pure[IO]) + + tc.handler.createHandlingDefinition().process(event).assertNoException + } + } + + "lock while executing" in { + val test = Ref.unsafe[IO, Int](0) + val tc = new TestCase { + override val tsWriteLock: TsWriteLock[IO] = + Lock.from[IO, projects.Path](Kleisli(_ => test.update(_ + 1)))(Kleisli(_ => test.update(_ + 1))) + } + + tc.expectTSReadinessCheckerCall syncRepoMetadataEvents[IO].map(_.generateOne).flatMap { event => - (eventProcessor.process _).expects(event).returns(().pure[IO]) + (tc.eventProcessor.process _).expects(event).returns(().pure[IO]) - handler.createHandlingDefinition().process(event).asserting(_ shouldBe ()) + tc.handler.createHandlingDefinition().process(event).map { r => + r shouldBe () + test.get.unsafeRunSync() shouldBe 2 + } } } } @@ -61,33 +85,44 @@ class EventHandlerSpec extends AsyncWordSpec with AsyncIOSpec with should.Matche "handlingDefinition.precondition" should { "be the TSReadinessForEventsChecker.verifyTSReady" in { + val tc = new TestCase + tc.expectTSReadinessCheckerCall - expectTSReadinessCheckerCall - - handler.createHandlingDefinition().precondition.asserting(_ shouldBe readinessCheckerResult) + tc.handler.createHandlingDefinition().precondition.asserting(_ shouldBe tc.readinessCheckerResult) } } "handlingDefinition.onRelease" should { "not be defined" in { + val tc = new TestCase + tc.expectTSReadinessCheckerCall - expectTSReadinessCheckerCall - - handler.createHandlingDefinition().onRelease shouldBe None + tc.handler.createHandlingDefinition().onRelease shouldBe None } } - private implicit val logger: TestLogger[IO] = TestLogger[IO]() + class TestCase { + implicit val logger: TestLogger[IO] = TestLogger[IO]() - private lazy val tsReadinessChecker = mock[TSReadinessForEventsChecker[IO]] - private lazy val readinessCheckerResult = eventSchedulingResults.generateSome + lazy val tsReadinessChecker = mock[TSReadinessForEventsChecker[IO]] + lazy val readinessCheckerResult = eventSchedulingResults.generateSome - private def expectTSReadinessCheckerCall = - (() => tsReadinessChecker.verifyTSReady).expects().returns(readinessCheckerResult.pure[IO]) + def expectTSReadinessCheckerCall = + (() => tsReadinessChecker.verifyTSReady).expects().returns(readinessCheckerResult.pure[IO]) - private lazy val eventProcessor = mock[EventProcessor[IO]] + lazy val eventProcessor = mock[EventProcessor[IO]] - private lazy val handler = - new EventHandler[IO](categoryName, tsReadinessChecker, EventDecoder, eventProcessor, mock[ProcessExecutor[IO]]) + def tsWriteLock: TsWriteLock[IO] = Lock.none[IO, projects.Path] + + lazy val handler = + new EventHandler[IO]( + categoryName, + tsReadinessChecker, + EventDecoder, + eventProcessor, + mock[ProcessExecutor[IO]], + tsWriteLock + ) + } } diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandlerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandlerSpec.scala index 80f23fb0f3..dcbc7b47d0 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandlerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/minprojectinfo/EventHandlerSpec.scala @@ -20,6 +20,7 @@ package io.renku.triplesgenerator.events.consumers package tsprovisioning.minprojectinfo import CategoryGenerators._ +import cats.data.Kleisli import cats.effect.{IO, Ref} import cats.syntax.all._ import io.circe.Encoder @@ -30,8 +31,11 @@ import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.ConsumersModelGenerators.eventSchedulingResults import io.renku.generators.Generators.Implicits._ +import io.renku.graph.model.projects import io.renku.interpreters.TestLogger +import io.renku.lock.Lock import io.renku.testtools.IOSpec +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec @@ -62,6 +66,19 @@ class EventHandlerSpec extends AnyWordSpec with IOSpec with MockFactory with sho handler.createHandlingDefinition().process(event).unsafeRunSync() shouldBe () } + + "lock while executing" in new TestCase { + val test = Ref.unsafe[IO, Int](0) + override val tsWriteLock: TsWriteLock[IO] = + Lock.from[IO, projects.Path](Kleisli(_ => test.update(_ + 1)))(Kleisli(_ => test.update(_ + 1))) + + val event = minProjectInfoEvents.generateOne + + (eventProcessor.process _).expects(event).returns(().pure[IO]) + + handler.createHandlingDefinition().process(event).unsafeRunSync() shouldBe () + test.get.unsafeRunSync() shouldBe 2 + } } "handlingDefinition.precondition" should { @@ -94,12 +111,14 @@ class EventHandlerSpec extends AnyWordSpec with IOSpec with MockFactory with sho (subscriptionMechanism.renewSubscription _).expects().returns(renewSubscriptionCalled.set(true)) val eventProcessor = mock[EventProcessor[IO]] - - val handler = new EventHandler[IO](categoryName, - tsReadinessChecker, - subscriptionMechanism, - eventProcessor, - mock[ProcessExecutor[IO]] + def tsWriteLock = Lock.none[IO, projects.Path] + lazy val handler = new EventHandler[IO]( + categoryName, + tsReadinessChecker, + subscriptionMechanism, + eventProcessor, + mock[ProcessExecutor[IO]], + tsWriteLock ) } diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandlerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandlerSpec.scala index cb86c4efa8..23850fb115 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandlerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/tsprovisioning/triplesgenerated/EventHandlerSpec.scala @@ -19,14 +19,18 @@ package io.renku.triplesgenerator.events.consumers.tsprovisioning.triplesgenerated import CategoryGenerators._ +import cats.data.Kleisli import cats.effect.{IO, Ref} import cats.syntax.all._ import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.ConsumersModelGenerators.eventSchedulingResults import io.renku.generators.Generators.Implicits._ +import io.renku.graph.model.projects import io.renku.interpreters.TestLogger +import io.renku.lock.Lock import io.renku.testtools.IOSpec +import io.renku.triplesgenerator.TgLockDB.TsWriteLock import io.renku.triplesgenerator.events.consumers.TSReadinessForEventsChecker import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should @@ -51,6 +55,19 @@ class EventHandlerSpec extends AnyWordSpec with IOSpec with MockFactory with sho handler.createHandlingDefinition().process(event).unsafeRunSync() shouldBe () } + + "lock while executing" in new TestCase { + val test = Ref.unsafe[IO, Int](0) + override val tsWriteLock: TsWriteLock[IO] = + Lock.from[IO, projects.Path](Kleisli(_ => test.update(_ + 1)))(Kleisli(_ => test.update(_ + 1))) + + val event = triplesGeneratedEvents.generateOne + + (eventProcessor.process _).expects(event).returns(().pure[IO]) + + handler.createHandlingDefinition().process(event).unsafeRunSync() shouldBe () + test.get.unsafeRunSync() shouldBe 2 + } } "handlingDefinition.precondition" should { @@ -84,12 +101,16 @@ class EventHandlerSpec extends AnyWordSpec with IOSpec with MockFactory with sho val eventProcessor = mock[EventProcessor[IO]] - val handler = new EventHandler[IO](categoryName, - tsReadinessChecker, - EventDecoder(), - subscriptionMechanism, - eventProcessor, - mock[ProcessExecutor[IO]] + def tsWriteLock: TsWriteLock[IO] = Lock.none[IO, projects.Path] + + lazy val handler = new EventHandler[IO]( + categoryName, + tsReadinessChecker, + EventDecoder(), + subscriptionMechanism, + eventProcessor, + mock[ProcessExecutor[IO]], + tsWriteLock ) } }