Skip to content

Commit

Permalink
feat: avoid concurrent writes to jena (#1577)
Browse files Browse the repository at this point in the history
Use postgres advisory locks to manage writes to jena during event processing.
  • Loading branch information
eikek authored Jul 12, 2023
1 parent 720149b commit 15255b2
Show file tree
Hide file tree
Showing 54 changed files with 1,723 additions and 177 deletions.
4 changes: 0 additions & 4 deletions acceptance-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,3 @@ services {
api-url = "http://localhost:9004/knowledge-graph"
}
}

projects-tokens {
db-port = 49998
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CommitHistoryChangesSpec

sleep((10 seconds).toMillis)

`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

eventually {
EventLog.findEvents(project.id, events.EventStatus.TriplesStore).toSet shouldBe newCommits.toList.toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CommitSyncFlowsSpec extends AcceptanceSpec with ApplicationServices with T
.status shouldBe Accepted

And("relevant commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 0)

Then("the non missed events should be in the Triples Store")
eventually {
Expand All @@ -83,7 +83,7 @@ class CommitSyncFlowsSpec extends AcceptanceSpec with ApplicationServices with T
EventLog.forceCategoryEventTriggering(CategoryName("COMMIT_SYNC"), project.id)

And("commit events for the missed event are created and processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

Then("triples for both of the project's commits should be in the Triples Store")
eventually {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv
.status shouldBe Accepted

And("commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

Then(s"all the events should get the $TriplesStore status in the Event Log")
EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(TriplesStore)
Expand Down Expand Up @@ -89,7 +89,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv
.status shouldBe Accepted

And("relevant commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

And(s"all the events should get the $GenerationNonRecoverableFailure status in the Event Log")
EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(GenerationNonRecoverableFailure)
Expand Down Expand Up @@ -151,7 +151,7 @@ class EventFlowsSpec extends AcceptanceSpec with ApplicationServices with TSProv
.status shouldBe Accepted

And("relevant commit events are processed")
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

Then(s"all the events should get the $TransformationNonRecoverableFailure status in the Event Log")
EventLog.findEvents(project.id).map(_._2).toSet shouldBe Set(TransformationNonRecoverableFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ProjectReProvisioningSpec extends AcceptanceSpec with ApplicationServices

Then("the old data in the TS should be replaced with the new")
sleep((10 seconds).toMillis)
`wait for events to be processed`(project.id, user.accessToken)
`wait for events to be processed`(project.id, user.accessToken, 5)

eventually {
knowledgeGraphClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@

package io.renku.graph.acceptancetests.db

import cats.Applicative
import cats.data.{Kleisli, NonEmptyList}
import cats.effect.IO._
import cats.effect.unsafe.IORuntime
import cats.effect.{IO, Resource}
import cats.syntax.all._
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import io.renku.db.{DBConfigProvider, PostgresContainer, SessionResource}
import io.renku.db.{DBConfigProvider, SessionResource}
import io.renku.eventlog._
import io.renku.events.CategoryName
import io.renku.graph.model.events.{CommitId, EventId, EventStatus}
import io.renku.graph.model.projects
import io.renku.graph.model.projects.GitLabId
import natchez.Trace.Implicits.noop
import org.typelevel.log4cats.Logger
import skunk._
import skunk.codec.text.varchar
import skunk.implicits._

import scala.collection.immutable
import scala.util.Try

object EventLog extends TypeSerializers {
Expand Down Expand Up @@ -95,18 +91,6 @@ object EventLog extends TypeSerializers {
private lazy val dbConfig: DBConfigProvider.DBConfig[EventLogDB] =
new EventLogDbConfigProvider[Try].get().fold(throw _, identity)

private lazy val postgresContainer = FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = immutable.Map("POSTGRES_USER" -> dbConfig.user.value,
"POSTGRES_PASSWORD" -> dbConfig.pass.value,
"POSTGRES_DB" -> dbConfig.name.value
),
exposedPorts = Seq(dbConfig.port.value),
exposedHostPort = dbConfig.port.value,
exposedContainerPort = dbConfig.port.value,
command = Seq(s"-p ${dbConfig.port.value}")
)

def removeGlobalCommitSyncRow(projectId: GitLabId)(implicit ioRuntime: IORuntime): Unit = execute { session =>
val command: Command[projects.GitLabId] = sql"""
DELETE FROM subscription_category_sync_time
Expand All @@ -116,19 +100,11 @@ object EventLog extends TypeSerializers {
}

def startDB()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start()))
_ <- PostgresDB.startPostgres
_ <- PostgresDB.initializeDatabase(dbConfig)
_ <- logger.info("event_log DB started")
} yield ()

private lazy val sessionResource: Resource[IO, SessionResource[IO, EventLogDB]] =
Session
.pooled(
host = postgresContainer.host,
port = dbConfig.port.value,
database = dbConfig.name.value,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
max = dbConfig.connectionPool.value
)
.map(new SessionResource(_))
PostgresDB.sessionPoolResource(dbConfig)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2023 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.graph.acceptancetests.db

import cats.effect._
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import eu.timepit.refined.auto._
import io.renku.db.DBConfigProvider.DBConfig
import io.renku.db.{PostgresContainer, SessionResource}
import skunk.codec.all._
import skunk.implicits._
import skunk._
import natchez.Trace.Implicits.noop
import org.typelevel.log4cats.Logger
import scala.concurrent.duration._

object PostgresDB {
private[this] val starting: Ref[IO, Boolean] = Ref.unsafe[IO, Boolean](false)

private val dbConfig = DBConfig[Any](
host = "localhost",
port = 5432,
name = "postgres",
user = "at",
pass = "at",
connectionPool = 8
)

private val postgresContainer = {
val cfg = dbConfig
FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = Map(
"POSTGRES_USER" -> cfg.user.value,
"POSTGRES_PASSWORD" -> cfg.pass.value
),
exposedPorts = Seq(cfg.port.value),
exposedHostPort = cfg.port.value,
exposedContainerPort = cfg.port.value,
command = Seq(s"-p ${cfg.port.value}")
)
}

def startPostgres(implicit L: Logger[IO]) =
starting.getAndUpdate(_ => true).flatMap {
case false =>
IO.unlessA(postgresContainer.container.isRunning)(
IO(postgresContainer.start()) *> waitForPostgres *> L.info(
"PostgreSQL database started"
)
)

case true =>
waitForPostgres
}

private def waitForPostgres =
fs2.Stream
.awakeDelay[IO](0.5.seconds)
.evalMap { _ =>
Session
.single[IO](
host = dbConfig.host,
port = dbConfig.port,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
database = dbConfig.name.value
)
.use(_.unique(sql"SELECT 1".query(int4)))
.attempt
}
.map(_.fold(_ => 1, _ => 0))
.take(100)
.find(_ == 0)
.compile
.drain

def sessionPool(dbCfg: DBConfig[_]): Resource[IO, Resource[IO, Session[IO]]] =
Session
.pooled[IO](
host = postgresContainer.host,
port = dbConfig.port.value,
database = dbCfg.name.value,
user = dbCfg.user.value,
password = Some(dbCfg.pass.value),
max = dbConfig.connectionPool.value
)

def sessionPoolResource[A](dbCfg: DBConfig[_]): Resource[IO, SessionResource[IO, A]] =
sessionPool(dbCfg).map(new SessionResource[IO, A](_))

def initializeDatabase(cfg: DBConfig[_]): IO[Unit] = {
val session = Session.single[IO](
host = dbConfig.host,
port = dbConfig.port,
user = dbConfig.user.value,
password = Some(dbConfig.pass.value),
database = dbConfig.name.value
)

// note: it would be simpler to use the default user that is created with the container, but that requires
// to first refactor how the configuration is loaded. Currently services load it from the file every time and so
// this creates the users as expected from the given config
val createRole: Command[Void] =
sql"create role #${cfg.user.value} with password '#${cfg.pass.value}' superuser login".command
val createDatabase: Command[Void] = sql"create database #${cfg.name.value}".command
val grants: Command[Void] = sql"grant all on database #${cfg.name.value} to #${cfg.user.value}".command

session.use { s =>
s.execute(createRole) *>
s.execute(createDatabase) *>
s.execute(grants)
}.void
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package io.renku.graph.acceptancetests.db

import cats.Applicative
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.dimafeng.testcontainers.FixedHostPortGenericContainer
import io.renku.db.{DBConfigProvider, PostgresContainer}
import io.renku.db.DBConfigProvider
import io.renku.tokenrepository.repository.{ProjectsTokensDB, ProjectsTokensDbConfigProvider}
import org.typelevel.log4cats.Logger

Expand All @@ -33,20 +30,9 @@ object TokenRepository {
private lazy val dbConfig: DBConfigProvider.DBConfig[ProjectsTokensDB] =
new ProjectsTokensDbConfigProvider[Try].get().fold(throw _, identity)

private lazy val postgresContainer = FixedHostPortGenericContainer(
imageName = PostgresContainer.image,
env = Map("POSTGRES_USER" -> dbConfig.user.value,
"POSTGRES_PASSWORD" -> dbConfig.pass.value,
"POSTGRES_DB" -> dbConfig.name.value
),
exposedPorts = Seq(dbConfig.port.value),
exposedHostPort = dbConfig.port.value,
exposedContainerPort = dbConfig.port.value,
command = Seq(s"-p ${dbConfig.port.value}")
)

def startDB()(implicit ioRuntime: IORuntime, logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(postgresContainer.container.isRunning)(IO(postgresContainer.start()))
def startDB()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- PostgresDB.startPostgres
_ <- PostgresDB.initializeDatabase(dbConfig)
_ <- logger.info("projects_tokens DB started")
} yield ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@

package io.renku.graph.acceptancetests.db

import cats.effect.{IO, Resource, Temporal}
import cats.{Applicative, Monad}
import cats.effect.{IO, Temporal}
import eu.timepit.refined.auto._
import io.renku.db.DBConfigProvider
import io.renku.triplesgenerator.TgLockDB.SessionResource
import io.renku.triplesgenerator.{TgLockDB, TgLockDbConfigProvider}
import io.renku.triplesstore._
import org.typelevel.log4cats.Logger

import scala.concurrent.duration._
import scala.util.Try

object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDataset {

protected override val jenaRunMode: JenaRunMode = JenaRunMode.FixedPortContainer(3030)

private val dbConfig: DBConfigProvider.DBConfig[TgLockDB] =
new TgLockDbConfigProvider[Try].get().fold(throw _, identity)

lazy val sessionResource: Resource[IO, SessionResource[IO]] =
PostgresDB.sessionPoolResource(dbConfig)

def start()(implicit logger: Logger[IO]): IO[Unit] = for {
_ <- Applicative[IO].unlessA(isRunning)(IO(container.start()))
_ <- PostgresDB.startPostgres
_ <- PostgresDB.initializeDatabase(dbConfig)
_ <- waitForReadiness
_ <- logger.info("Triples Store started")
} yield ()
Expand All @@ -40,4 +52,5 @@ object TriplesStore extends InMemoryJena with ProjectsDataset with MigrationsDat

private def waitForReadiness(implicit logger: Logger[IO]): IO[Unit] =
Monad[IO].whileM_(IO(!isRunning))(logger.info("Waiting for TS") >> (Temporal[IO] sleep (500 millis)))

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.renku.graph.acceptancetests.flows

import cats.data.NonEmptyList
import cats.effect.unsafe.IORuntime
import cats.syntax.all._
import io.renku.events.CategoryName
import io.renku.generators.Generators.Implicits._
import io.renku.graph.acceptancetests.data
Expand Down Expand Up @@ -74,15 +75,20 @@ trait TSProvisioning
sleep((5 seconds).toMillis)
}

`wait for events to be processed`(project.id, accessToken)
`wait for events to be processed`(project.id, accessToken, 5)
}

def `wait for events to be processed`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion =
def `wait for events to be processed`(
projectId: projects.GitLabId,
accessToken: AccessToken,
expectedMinTotal: Int
): Assertion =
eventually {
val response = webhookServiceClient.`GET projects/:id/events/status`(projectId, accessToken)
response.status shouldBe Ok
response.jsonBody.hcursor.downField("activated").as[Boolean].value shouldBe true
response.jsonBody.hcursor.downField("progress").downField("percentage").as[Double].value shouldBe 100d
response.jsonBody.hcursor.downField("progress").downField("total").as[Int].value should be >= expectedMinTotal
}

def `check hook cannot be found`(projectId: projects.GitLabId, accessToken: AccessToken): Assertion = eventually {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ class DatasetsResourcesSpec
gitLabStub.setupProject(project, commitId)
mockCommitDataOnTripleGenerator(project, toPayloadJsonLD(project), commitId)
`data in the Triples Store`(project, commitId, creator.accessToken)
`wait for events to be processed`(project.id, creator.accessToken)
`wait for events to be processed`(project.id, creator.accessToken, 5)

When("an authenticated and authorised user fetches dataset details through GET knowledge-graph/datasets/:id")
val detailsResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.time.{Minutes, Second, Span}
trait AcceptanceTestPatience extends AbstractPatienceConfiguration {

implicit override val patienceConfig: PatienceConfig = PatienceConfig(
timeout = scaled(Span(4, Minutes)),
timeout = scaled(Span(6, Minutes)),
interval = scaled(Span(1, Second))
)
}
Loading

0 comments on commit 15255b2

Please sign in to comment.